handler_udp.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package tun
  2. import (
  3. "context"
  4. tun_net "github.com/v2fly/v2ray-core/v5/app/tun/net"
  5. "github.com/v2fly/v2ray-core/v5/common/buf"
  6. "github.com/v2fly/v2ray-core/v5/common/net"
  7. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  8. udp_proto "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  9. "github.com/v2fly/v2ray-core/v5/common/session"
  10. "github.com/v2fly/v2ray-core/v5/features/policy"
  11. "github.com/v2fly/v2ray-core/v5/features/routing"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  13. "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
  14. "gvisor.dev/gvisor/pkg/tcpip/stack"
  15. gvisor_udp "gvisor.dev/gvisor/pkg/tcpip/transport/udp"
  16. "gvisor.dev/gvisor/pkg/waiter"
  17. )
  18. type UDPHandler struct {
  19. ctx context.Context
  20. dispatcher routing.Dispatcher
  21. policyManager policy.Manager
  22. config *Config
  23. stack *stack.Stack
  24. }
  25. type udpConn struct {
  26. *gonet.UDPConn
  27. id stack.TransportEndpointID
  28. }
  29. func (c *udpConn) ID() *stack.TransportEndpointID {
  30. return &c.id
  31. }
  32. func HandleUDP(handle func(tun_net.UDPConn)) StackOption {
  33. return func(s *stack.Stack) error {
  34. udpForwarder := gvisor_udp.NewForwarder(s, func(r *gvisor_udp.ForwarderRequest) {
  35. wg := new(waiter.Queue)
  36. linkedEndpoint, err := r.CreateEndpoint(wg)
  37. if err != nil {
  38. // TODO: log
  39. return
  40. }
  41. udpConn := &udpConn{
  42. UDPConn: gonet.NewUDPConn(s, wg, linkedEndpoint),
  43. id: r.ID(),
  44. }
  45. handle(udpConn)
  46. })
  47. s.SetTransportProtocolHandler(gvisor_udp.ProtocolNumber, udpForwarder.HandlePacket)
  48. return nil
  49. }
  50. }
  51. func (h *UDPHandler) HandleQueue(ch chan tun_net.UDPConn) {
  52. for {
  53. select {
  54. case <-h.ctx.Done():
  55. return
  56. case conn := <-ch:
  57. if err := h.Handle(conn); err != nil {
  58. newError(err).AtError().WriteToLog(session.ExportIDToError(h.ctx))
  59. }
  60. }
  61. }
  62. }
  63. func (h *UDPHandler) Handle(conn tun_net.UDPConn) error {
  64. defer conn.Close()
  65. id := conn.ID()
  66. ctx := session.ContextWithInbound(h.ctx, &session.Inbound{Tag: h.config.Tag})
  67. packetConn := conn.(net.PacketConn)
  68. udpDispatcherConstructor := udp.NewSplitDispatcher
  69. switch h.config.PacketEncoding {
  70. case packetaddr.PacketAddrType_None:
  71. break
  72. case packetaddr.PacketAddrType_Packet:
  73. packetAddrDispatcherFactory := udp.NewPacketAddrDispatcherCreator(ctx)
  74. udpDispatcherConstructor = packetAddrDispatcherFactory.NewPacketAddrDispatcher
  75. }
  76. dest := net.UDPDestination(tun_net.AddressFromTCPIPAddr(id.LocalAddress), net.Port(id.LocalPort))
  77. src := net.UDPDestination(tun_net.AddressFromTCPIPAddr(id.RemoteAddress), net.Port(id.RemotePort))
  78. udpServer := udpDispatcherConstructor(h.dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
  79. if _, err := packetConn.WriteTo(packet.Payload.Bytes(), &net.UDPAddr{
  80. IP: src.Address.IP(),
  81. Port: int(src.Port),
  82. }); err != nil {
  83. newError("failed to write UDP packet").Base(err).WriteToLog()
  84. }
  85. })
  86. for {
  87. select {
  88. case <-ctx.Done():
  89. return nil
  90. default:
  91. var buffer [2048]byte
  92. n, _, err := packetConn.ReadFrom(buffer[:])
  93. if err != nil {
  94. return newError("failed to read UDP packet").Base(err)
  95. }
  96. currentPacketCtx := ctx
  97. udpServer.Dispatch(currentPacketCtx, dest, buf.FromBytes(buffer[:n]))
  98. }
  99. }
  100. }