outbound.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package outbound
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/mustafaturan/bus"
  8. "github.com/xiaokangwang/VLite/ass/udpconn2tun"
  9. "github.com/xiaokangwang/VLite/interfaces"
  10. "github.com/xiaokangwang/VLite/interfaces/ibus"
  11. vltransport "github.com/xiaokangwang/VLite/transport"
  12. udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay"
  13. "github.com/xiaokangwang/VLite/transport/packetuni/puniClient"
  14. "github.com/xiaokangwang/VLite/transport/udp/udpClient"
  15. "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunic"
  16. "github.com/xiaokangwang/VLite/transport/uni/uniclient"
  17. client2 "github.com/xiaokangwang/VLite/workers/client"
  18. "github.com/v2fly/v2ray-core/v5/common"
  19. "github.com/v2fly/v2ray-core/v5/common/environment"
  20. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  21. "github.com/v2fly/v2ray-core/v5/common/net"
  22. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  23. "github.com/v2fly/v2ray-core/v5/common/session"
  24. "github.com/v2fly/v2ray-core/v5/common/signal"
  25. "github.com/v2fly/v2ray-core/v5/common/task"
  26. "github.com/v2fly/v2ray-core/v5/transport"
  27. "github.com/v2fly/v2ray-core/v5/transport/internet"
  28. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  29. )
  30. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  31. func NewUDPOutboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) {
  32. proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
  33. statusInstance, err := createStatusFromConfig(config)
  34. if err != nil {
  35. return nil, newError("unable to initialize vlite").Base(err)
  36. }
  37. proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance)
  38. return &Handler{ctx: ctx}, nil
  39. }
  40. type Handler struct {
  41. ctx context.Context
  42. }
  43. // Process implements proxy.Outbound.Process().
  44. func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
  45. proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment)
  46. statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status")
  47. if err != nil {
  48. return newError("uninitialized handler").Base(err)
  49. }
  50. statusInstance := statusInstanceIfce.(*status)
  51. err = h.ensureStarted(statusInstance)
  52. if err != nil {
  53. return newError("unable to initialize").Base(err)
  54. }
  55. connid := session.IDFromContext(ctx)
  56. outbound := session.OutboundFromContext(ctx)
  57. if outbound == nil || !outbound.Target.IsValid() {
  58. return newError("target not specified")
  59. }
  60. destination := outbound.Target
  61. packetConnOut := statusInstance.connAdp.DialUDP(net.UDPAddr{Port: int(connid % 65535)})
  62. ctx, cancel := context.WithCancel(ctx)
  63. timer := signal.CancelAfterInactivity(ctx, cancel, time.Second*600)
  64. if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
  65. requestDone := func() error {
  66. return udp.CopyPacketConn(packetConnOut, packetConn, udp.UpdateActivity(timer))
  67. }
  68. responseDone := func() error {
  69. return udp.CopyPacketConn(packetConn, packetConnOut, udp.UpdateActivity(timer))
  70. }
  71. responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
  72. if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
  73. return newError("connection ends").Base(err)
  74. }
  75. }
  76. return newError("unrecognized connection")
  77. }
  78. func (h *Handler) ensureStarted(s *status) error {
  79. s.access.Lock()
  80. defer s.access.Unlock()
  81. if s.TunnelRxFromTun == nil {
  82. err := enableInterface(s)
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. return nil
  88. }
  89. type status struct {
  90. ctx context.Context
  91. password []byte
  92. msgbus *bus.Bus
  93. udpdialer vltransport.UnderlayTransportDialer
  94. puni *puniClient.PacketUniClient
  95. udprelay *udpsctpserver.PacketSCTPRelay
  96. udpserver *client2.UDPClientContext
  97. TunnelTxToTun chan interfaces.UDPPacket
  98. TunnelRxFromTun chan interfaces.UDPPacket
  99. connAdp *udpconn2tun.UDPConn2Tun
  100. config UDPProtocolConfig
  101. access sync.Mutex
  102. }
  103. func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) { //nolint:unparam
  104. s := &status{password: []byte(config.Password)}
  105. ctx := context.Background()
  106. s.msgbus = ibus.NewMessageBus()
  107. ctx = context.WithValue(ctx, interfaces.ExtraOptionsMessageBus, s.msgbus) //nolint:revive,staticcheck
  108. ctx = context.WithValue(ctx, interfaces.ExtraOptionsDisableAutoQuitForClient, true) //nolint:revive,staticcheck
  109. if config.EnableFec {
  110. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPFECEnabled, true) //nolint:revive,staticcheck
  111. }
  112. if config.ScramblePacket {
  113. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPShouldMask, true) //nolint:revive,staticcheck
  114. }
  115. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPMask, string(s.password)) //nolint:revive,staticcheck
  116. if config.HandshakeMaskingPaddingSize != 0 {
  117. ctxv := &interfaces.ExtraOptionsUsePacketArmorValue{PacketArmorPaddingTo: int(config.HandshakeMaskingPaddingSize), UsePacketArmor: true}
  118. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUsePacketArmor, ctxv) //nolint:revive,staticcheck
  119. }
  120. destinationString := fmt.Sprintf("%v:%v", config.Address.AsAddress().String(), config.Port)
  121. s.udpdialer = udpClient.NewUdpClient(destinationString, ctx)
  122. if config.EnableStabilization {
  123. s.udpdialer = udpunic.NewUdpUniClient(string(s.password), ctx, s.udpdialer)
  124. s.udpdialer = uniclient.NewUnifiedConnectionClient(s.udpdialer, ctx)
  125. }
  126. s.ctx = ctx
  127. return s, nil
  128. }
  129. func enableInterface(s *status) error {
  130. conn, err, connctx := s.udpdialer.Connect(s.ctx)
  131. if err != nil {
  132. return newError("unable to connect to remote").Base(err)
  133. }
  134. C_C2STraffic := make(chan client2.UDPClientTxToServerTraffic, 8) //nolint:revive,stylecheck
  135. C_C2SDataTraffic := make(chan client2.UDPClientTxToServerDataTraffic, 8) //nolint:revive,stylecheck
  136. C_S2CTraffic := make(chan client2.UDPClientRxFromServerTraffic, 8) //nolint:revive,stylecheck
  137. C_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive,stylecheck
  138. C_C2SDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive,stylecheck
  139. C_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive,stylecheck
  140. go func(ctx context.Context) {
  141. for {
  142. select {
  143. case data := <-C_C2STraffic:
  144. C_C2STraffic2 <- interfaces.TrafficWithChannelTag(data)
  145. case <-ctx.Done():
  146. return
  147. }
  148. }
  149. }(connctx)
  150. go func(ctx context.Context) {
  151. for {
  152. select {
  153. case data := <-C_C2SDataTraffic:
  154. C_C2SDataTraffic2 <- interfaces.TrafficWithChannelTag(data)
  155. case <-ctx.Done():
  156. return
  157. }
  158. }
  159. }(connctx)
  160. go func(ctx context.Context) {
  161. for {
  162. select {
  163. case data := <-C_S2CTraffic2:
  164. C_S2CTraffic <- client2.UDPClientRxFromServerTraffic(data)
  165. case <-ctx.Done():
  166. return
  167. }
  168. }
  169. }(connctx)
  170. TunnelTxToTun := make(chan interfaces.UDPPacket)
  171. TunnelRxFromTun := make(chan interfaces.UDPPacket)
  172. s.TunnelTxToTun = TunnelTxToTun
  173. s.TunnelRxFromTun = TunnelRxFromTun
  174. if s.config.EnableStabilization && s.config.EnableRenegotiation {
  175. s.puni = puniClient.NewPacketUniClient(C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  176. s.puni.OnAutoCarrier(conn, connctx)
  177. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.puni)
  178. } else {
  179. s.udprelay = udpsctpserver.NewPacketRelayClient(conn, C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  180. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.udprelay)
  181. }
  182. s.ctx = connctx
  183. s.connAdp = udpconn2tun.NewUDPConn2Tun(TunnelTxToTun, TunnelRxFromTun)
  184. return nil
  185. }
  186. func init() {
  187. common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  188. return NewUDPOutboundHandler(ctx, config.(*UDPProtocolConfig))
  189. }))
  190. }