outbound.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package outbound
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/mustafaturan/bus"
  8. "github.com/xiaokangwang/VLite/ass/udpconn2tun"
  9. "github.com/xiaokangwang/VLite/interfaces"
  10. "github.com/xiaokangwang/VLite/interfaces/ibus"
  11. vltransport "github.com/xiaokangwang/VLite/transport"
  12. udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay"
  13. "github.com/xiaokangwang/VLite/transport/packetuni/puniClient"
  14. "github.com/xiaokangwang/VLite/transport/udp/udpClient"
  15. "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunic"
  16. "github.com/xiaokangwang/VLite/transport/uni/uniclient"
  17. "github.com/v2fly/v2ray-core/v5/common"
  18. "github.com/v2fly/v2ray-core/v5/common/environment"
  19. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  20. "github.com/v2fly/v2ray-core/v5/common/net"
  21. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  22. "github.com/v2fly/v2ray-core/v5/common/session"
  23. "github.com/v2fly/v2ray-core/v5/common/signal"
  24. "github.com/v2fly/v2ray-core/v5/common/task"
  25. "github.com/v2fly/v2ray-core/v5/transport"
  26. "github.com/v2fly/v2ray-core/v5/transport/internet"
  27. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  28. client2 "github.com/xiaokangwang/VLite/workers/client"
  29. )
  30. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  31. func NewUDPOutboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) {
  32. proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
  33. statusInstance, err := createStatusFromConfig(config)
  34. if err != nil {
  35. return nil, newError("unable to initialize vlite").Base(err)
  36. }
  37. proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance)
  38. return &Handler{ctx: ctx}, nil
  39. }
  40. type Handler struct {
  41. ctx context.Context
  42. }
  43. // Process implements proxy.Outbound.Process().
  44. func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
  45. proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment)
  46. statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status")
  47. if err != nil {
  48. return newError("uninitialized handler").Base(err)
  49. }
  50. statusInstance := statusInstanceIfce.(*status)
  51. err = h.ensureStarted(statusInstance)
  52. if err != nil {
  53. return newError("unable to initialize").Base(err)
  54. }
  55. connid := session.IDFromContext(ctx)
  56. outbound := session.OutboundFromContext(ctx)
  57. if outbound == nil || !outbound.Target.IsValid() {
  58. return newError("target not specified")
  59. }
  60. destination := outbound.Target
  61. packetConnOut := statusInstance.connAdp.DialUDP(net.UDPAddr{Port: int(connid % 65535)})
  62. ctx, cancel := context.WithCancel(ctx)
  63. timer := signal.CancelAfterInactivity(ctx, cancel, time.Second*600)
  64. if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
  65. requestDone := func() error {
  66. return udp.CopyPacketConn(packetConnOut, packetConn, udp.UpdateActivity(timer))
  67. }
  68. responseDone := func() error {
  69. return udp.CopyPacketConn(packetConn, packetConnOut, udp.UpdateActivity(timer))
  70. }
  71. responseDoneAndCloseWriter := task.OnSuccess(responseDone, task.Close(link.Writer))
  72. if err := task.Run(ctx, requestDone, responseDoneAndCloseWriter); err != nil {
  73. return newError("connection ends").Base(err)
  74. }
  75. }
  76. return newError("unrecognized connection")
  77. }
  78. func (h *Handler) ensureStarted(s *status) error {
  79. s.access.Lock()
  80. defer s.access.Unlock()
  81. if s.TunnelRxFromTun == nil {
  82. err := enableInterface(s)
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. return nil
  88. }
  89. type status struct {
  90. ctx context.Context
  91. password []byte
  92. msgbus *bus.Bus
  93. udpdialer vltransport.UnderlayTransportDialer
  94. puni *puniClient.PacketUniClient
  95. udprelay *udpsctpserver.PacketSCTPRelay
  96. udpserver *client2.UDPClientContext
  97. TunnelTxToTun chan interfaces.UDPPacket
  98. TunnelRxFromTun chan interfaces.UDPPacket
  99. connAdp *udpconn2tun.UDPConn2Tun
  100. config UDPProtocolConfig
  101. access sync.Mutex
  102. }
  103. func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) {
  104. s := &status{password: []byte(config.Password)}
  105. ctx := context.Background()
  106. s.msgbus = ibus.NewMessageBus()
  107. ctx = context.WithValue(ctx, interfaces.ExtraOptionsMessageBus, s.msgbus) //nolint:revive
  108. if config.EnableFec {
  109. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPFECEnabled, true) //nolint:revive
  110. }
  111. if config.ScramblePacket {
  112. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPShouldMask, true) //nolint:revive
  113. }
  114. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUDPMask, string(s.password)) //nolint:revive
  115. if config.HandshakeMaskingPaddingSize != 0 {
  116. ctxv := &interfaces.ExtraOptionsUsePacketArmorValue{PacketArmorPaddingTo: int(config.HandshakeMaskingPaddingSize), UsePacketArmor: true}
  117. ctx = context.WithValue(ctx, interfaces.ExtraOptionsUsePacketArmor, ctxv) //nolint:revive
  118. }
  119. destinationString := fmt.Sprintf("%v:%v", config.Address.AsAddress().String(), config.Port)
  120. s.udpdialer = udpClient.NewUdpClient(destinationString, ctx)
  121. if config.EnableStabilization {
  122. s.udpdialer = udpunic.NewUdpUniClient(string(s.password), ctx, s.udpdialer)
  123. s.udpdialer = uniclient.NewUnifiedConnectionClient(s.udpdialer, ctx)
  124. }
  125. s.ctx = ctx
  126. return s, nil
  127. }
  128. func enableInterface(s *status) error {
  129. conn, err, connctx := s.udpdialer.Connect(s.ctx)
  130. if err != nil {
  131. return newError("unable to connect to remote").Base(err)
  132. }
  133. C_C2STraffic := make(chan client2.UDPClientTxToServerTraffic, 8) //nolint:revive
  134. C_C2SDataTraffic := make(chan client2.UDPClientTxToServerDataTraffic, 8) //nolint:revive
  135. C_S2CTraffic := make(chan client2.UDPClientRxFromServerTraffic, 8) //nolint:revive
  136. C_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive
  137. C_C2SDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive
  138. C_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8) //nolint:revive
  139. go func(ctx context.Context) {
  140. for {
  141. select {
  142. case data := <-C_C2STraffic:
  143. C_C2STraffic2 <- interfaces.TrafficWithChannelTag(data)
  144. case <-ctx.Done():
  145. return
  146. }
  147. }
  148. }(connctx)
  149. go func(ctx context.Context) {
  150. for {
  151. select {
  152. case data := <-C_C2SDataTraffic:
  153. C_C2SDataTraffic2 <- interfaces.TrafficWithChannelTag(data)
  154. case <-ctx.Done():
  155. return
  156. }
  157. }
  158. }(connctx)
  159. go func(ctx context.Context) {
  160. for {
  161. select {
  162. case data := <-C_S2CTraffic2:
  163. C_S2CTraffic <- client2.UDPClientRxFromServerTraffic(data)
  164. case <-ctx.Done():
  165. return
  166. }
  167. }
  168. }(connctx)
  169. TunnelTxToTun := make(chan interfaces.UDPPacket)
  170. TunnelRxFromTun := make(chan interfaces.UDPPacket)
  171. s.TunnelTxToTun = TunnelTxToTun
  172. s.TunnelRxFromTun = TunnelRxFromTun
  173. if s.config.EnableStabilization && s.config.EnableRenegotiation {
  174. s.puni = puniClient.NewPacketUniClient(C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  175. s.puni.OnAutoCarrier(conn, connctx)
  176. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.puni)
  177. } else {
  178. s.udprelay = udpsctpserver.NewPacketRelayClient(conn, C_C2STraffic2, C_C2SDataTraffic2, C_S2CTraffic2, s.password, connctx)
  179. s.udpserver = client2.UDPClient(connctx, C_C2STraffic, C_C2SDataTraffic, C_S2CTraffic, TunnelTxToTun, TunnelRxFromTun, s.udprelay)
  180. }
  181. s.ctx = connctx
  182. s.connAdp = udpconn2tun.NewUDPConn2Tun(TunnelTxToTun, TunnelRxFromTun)
  183. return nil
  184. }
  185. func init() {
  186. common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  187. return NewUDPOutboundHandler(ctx, config.(*UDPProtocolConfig))
  188. }))
  189. }