outbound.go 6.8 KB


  1. package outbound
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/mustafaturan/bus"
  6. "github.com/v2fly/v2ray-core/v5/common"
  7. "github.com/v2fly/v2ray-core/v5/common/environment"
  8. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  11. "github.com/v2fly/v2ray-core/v5/common/session"
  12. "github.com/v2fly/v2ray-core/v5/common/signal"
  13. "github.com/v2fly/v2ray-core/v5/common/task"
  14. "github.com/v2fly/v2ray-core/v5/transport"
  15. "github.com/v2fly/v2ray-core/v5/transport/internet"
  16. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  17. "github.com/xiaokangwang/VLite/ass/udpconn2tun"
  18. "github.com/xiaokangwang/VLite/interfaces"
  19. "github.com/xiaokangwang/VLite/interfaces/ibus"
  20. vltransport "github.com/xiaokangwang/VLite/transport"
  21. udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay"
  22. "github.com/xiaokangwang/VLite/transport/packetuni/puniClient"
  23. "github.com/xiaokangwang/VLite/transport/udp/udpClient"
  24. "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunic"
  25. "github.com/xiaokangwang/VLite/transport/uni/uniclient"
  26. "sync"
  27. "time"
  28. client2 "github.com/xiaokangwang/VLite/workers/client"
  29. )
  30. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  31. func NewUDPOutboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) {
  32. proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
  33. statusInstance, err := createStatusFromConfig(config)
  34. if err != nil {
  35. return nil, newError("unable to initialize vlite").Base(err)
  36. }
  37. proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance)
  38. return &Handler{ctx: ctx}, nil
  39. }
  40. type Handler struct {
  41. ctx context.Context
  42. }
  43. // Process implements proxy.Outbound.Process().
  44. func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
  45. proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
  46. statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status")
  47. if err != nil {
  48. return newError("uninitialized handler").Base(err)
  49. }
  50. statusInstance := statusInstanceIfce.(*status)
  51. err = h.ensureStarted(statusInstance)
  52. if err != nil {
  53. return newError("unable to initialize").Base(err)
  54. }
  55. connid := session.IDFromContext(ctx)
  56. outbound := session.OutboundFromContext(ctx)
  57. if outbound == nil || !outbound.Target.IsValid() {
  58. return newError("target not specified")
  59. }
  60. destination := outbound.Target
  61. packetConnOut := statusInstance.connAdp.DialUDP(net.UDPAddr{Port: int(connid)})
  62. ctx, cancel := context.WithCancel(ctx)
  63. timer := signal.CancelAfterInactivity(ctx, cancel, time.Second*600)
  64. if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
  65. requestDone := func() error {
  66. return udp.CopyPacketConn(packetConnOut, packetConn, udp.UpdateActivity(timer))
  67. }
  68. responseDone := func() error {
  69. return udp.CopyPacketConn(packetConn, packetConnOut, udp.UpdateActivity(timer))
  70. }
  71. responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
  72. if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
  73. return newError("connection ends").Base(err)
  74. }
  75. }
  76. return newError("unrecognized connection")
  77. }
  78. func (h *Handler) ensureStarted(s *status) error {
  79. s.access.Lock()
  80. defer s.access.Unlock()
  81. if s.TunnelRxFromTun == nil {
  82. err := enableInterface(s)
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. return nil
  88. }
  89. type status struct {
  90. ctx context.Context
  91. password []byte
  92. msgbus *bus.Bus
  93. udpdialer vltransport.UnderlayTransportDialer
  94. puni *puniClient.PacketUniClient
  95. udprelay *udpsctpserver.PacketSCTPRelay
  96. udpserver *client2.UDPClientContext
  97. TunnelTxToTun chan interfaces.UDPPacket
  98. TunnelRxFromTun chan interfaces.UDPPacket
  99. connAdp *udpconn2tun.UDPConn2Tun
  100. config UDPProtocolConfig
  101. access sync.Mutex
  102. }
  103. func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) {
  104. s := &status{}
  105. ctx := context.Background()
  106. s.msgbus = ibus.NewMessageBus()
  107. ctx = context.WithValue(ctx, interfaces.ExtraOptionsMessageBus, s.msgbus)
  108. if config.EnableFec {
  109. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPFECEnabled, true)
  110. }
  111. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPMask, string(s.password))
  112. destinationString := fmt.Sprintf("%v:%v", config.Address.String(), config.Port)
  113. s.udpdialer = udpClient.NewUdpClient(destinationString, ctx)
  114. if config.EnableStabilization {
  115. s.udpdialer = udpunic.NewUdpUniClient(string(s.password), ctx, s.udpdialer)
  116. s.udpdialer = uniclient.NewUnifiedConnectionClient(s.udpdialer, ctx)
  117. }
  118. return s, nil
  119. }
  120. func enableInterface(s *status) error {
  121. conn, err, connctx := s.udpdialer.Connect(s.ctx)
  122. if err != nil {
  123. return newError("unable to connect to remote").Base(err)
  124. }
  125. C_C2STraffic := make(chan client2.UDPClientTxToServerTraffic, 8)
  126. C_C2SDataTraffic := make(chan client2.UDPClientTxToServerDataTraffic, 8)
  127. C_S2CTraffic := make(chan client2.UDPClientRxFromServerTraffic, 8)
  128. C_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  129. C_C2SDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  130. C_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  131. go func(ctx context.Context) {
  132. for {
  133. select {
  134. case data := <-C_C2STraffic:
  135. C_C2STraffic2 <- interfaces.TrafficWithChannelTag(data)
  136. case <-ctx.Done():
  137. return
  138. }
  139. }
  140. }(connctx)
  141. go func(ctx context.Context) {
  142. for {
  143. select {
  144. case data := <-C_C2SDataTraffic:
  145. C_C2SDataTraffic2 <- interfaces.TrafficWithChannelTag(data)
  146. case <-ctx.Done():
  147. return
  148. }
  149. }
  150. }(connctx)
  151. go func(ctx context.Context) {
  152. for {
  153. select {
  154. case data := <-C_S2CTraffic2:
  155. C_S2CTraffic <- client2.UDPClientRxFromServerTraffic(data)
  156. case <-ctx.Done():
  157. return
  158. }
  159. }
  160. }(connctx)
  161. TunnelTxToTun := make(chan interfaces.UDPPacket)
  162. TunnelRxFromTun := make(chan interfaces.UDPPacket)
  163. s.TunnelTxToTun = TunnelTxToTun
  164. s.TunnelRxFromTun = TunnelRxFromTun
  165. if s.config.EnableStabilization && s.config.EnableRenegotiation {
  166. s.puni = puniClient.NewPacketUniClient(C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  167. s.puni.OnAutoCarrier(conn, connctx)
  168. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.puni)
  169. } else {
  170. s.udprelay = udpsctpserver.NewPacketRelayClient(conn, C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  171. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.udprelay)
  172. }
  173. s.ctx = connctx
  174. s.connAdp = udpconn2tun.NewUDPConn2Tun(TunnelTxToTun, TunnelRxFromTun)
  175. return nil
  176. }
  177. func init() {
  178. common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  179. return NewUDPOutboundHandler(ctx, config.(*UDPProtocolConfig))
  180. }))
  181. }