handler_udp.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package tun
  2. import (
  3. "context"
  4. "github.com/v2fly/v2ray-core/v5/common/buf"
  5. "github.com/v2fly/v2ray-core/v5/common/net"
  6. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  7. udp_proto "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  8. "github.com/v2fly/v2ray-core/v5/common/session"
  9. "github.com/v2fly/v2ray-core/v5/features/policy"
  10. "github.com/v2fly/v2ray-core/v5/features/routing"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  12. "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
  13. "gvisor.dev/gvisor/pkg/tcpip/stack"
  14. gvisor_udp "gvisor.dev/gvisor/pkg/tcpip/transport/udp"
  15. "gvisor.dev/gvisor/pkg/waiter"
  16. )
  17. type UDPHandler struct {
  18. ctx context.Context
  19. dispatcher routing.Dispatcher
  20. policyManager policy.Manager
  21. config *Config
  22. stack *stack.Stack
  23. }
  24. type udpConn struct {
  25. *gonet.UDPConn
  26. id stack.TransportEndpointID
  27. }
  28. func (c *udpConn) ID() *stack.TransportEndpointID {
  29. return &c.id
  30. }
  31. func HandleUDP(handle func(UDPConn)) StackOption {
  32. return func(s *stack.Stack) error {
  33. udpForwarder := gvisor_udp.NewForwarder(s, func(r *gvisor_udp.ForwarderRequest) {
  34. wg := new(waiter.Queue)
  35. linkedEndpoint, err := r.CreateEndpoint(wg)
  36. if err != nil {
  37. // TODO: log
  38. return
  39. }
  40. udpConn := &udpConn{
  41. UDPConn: gonet.NewUDPConn(s, wg, linkedEndpoint),
  42. id: r.ID(),
  43. }
  44. handle(udpConn)
  45. })
  46. s.SetTransportProtocolHandler(gvisor_udp.ProtocolNumber, udpForwarder.HandlePacket)
  47. return nil
  48. }
  49. }
  50. func (h *UDPHandler) HandleQueue(ch chan UDPConn) {
  51. for {
  52. select {
  53. case <-h.ctx.Done():
  54. return
  55. case conn := <-ch:
  56. if err := h.Handle(conn); err != nil {
  57. newError(err).AtError().WriteToLog(session.ExportIDToError(h.ctx))
  58. }
  59. }
  60. }
  61. }
  62. func (h *UDPHandler) Handle(conn UDPConn) error {
  63. ctx := session.ContextWithInbound(h.ctx, &session.Inbound{Tag: h.config.Tag})
  64. packetConn := conn.(net.PacketConn)
  65. udpDispatcherConstructor := udp.NewSplitDispatcher
  66. switch h.config.PacketEncoding {
  67. case packetaddr.PacketAddrType_None:
  68. break
  69. case packetaddr.PacketAddrType_Packet:
  70. packetAddrDispatcherFactory := udp.NewPacketAddrDispatcherCreator(ctx)
  71. udpDispatcherConstructor = packetAddrDispatcherFactory.NewPacketAddrDispatcher
  72. }
  73. udpServer := udpDispatcherConstructor(h.dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
  74. if _, err := packetConn.WriteTo(packet.Payload.Bytes(), &net.UDPAddr{
  75. IP: packet.Source.Address.IP(),
  76. Port: int(packet.Source.Port),
  77. }); err != nil {
  78. newError("failed to write UDP packet").Base(err).WriteToLog()
  79. }
  80. })
  81. for {
  82. select {
  83. case <-ctx.Done():
  84. return nil
  85. default:
  86. var buffer [2048]byte
  87. n, addr, err := packetConn.ReadFrom(buffer[:])
  88. if err != nil {
  89. return newError("failed to read UDP packet").Base(err)
  90. }
  91. currentPacketCtx := ctx
  92. udpServer.Dispatch(currentPacketCtx, net.DestinationFromAddr(addr), buf.FromBytes(buffer[:n]))
  93. }
  94. }
  95. }