inbound.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package inbound
  2. import (
  3. "context"
  4. "github.com/mustafaturan/bus"
  5. "github.com/v2fly/v2ray-core/v5/common"
  6. "github.com/v2fly/v2ray-core/v5/common/environment"
  7. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  8. "github.com/v2fly/v2ray-core/v5/common/net"
  9. "github.com/v2fly/v2ray-core/v5/common/session"
  10. "github.com/v2fly/v2ray-core/v5/common/signal/done"
  11. "github.com/v2fly/v2ray-core/v5/features/routing"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet"
  13. "github.com/xiaokangwang/VLite/interfaces"
  14. "github.com/xiaokangwang/VLite/interfaces/ibus"
  15. "github.com/xiaokangwang/VLite/transport"
  16. udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay"
  17. "github.com/xiaokangwang/VLite/transport/packetuni/puniServer"
  18. "github.com/xiaokangwang/VLite/transport/udp/udpServer"
  19. "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunis"
  20. "github.com/xiaokangwang/VLite/transport/uni/uniserver"
  21. "github.com/xiaokangwang/VLite/workers/server"
  22. "io"
  23. gonet "net"
  24. "strconv"
  25. "sync"
  26. )
  27. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  28. func NewUDPInboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) {
  29. proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
  30. statusInstance, err := createStatusFromConfig(config)
  31. if err != nil {
  32. return nil, newError("unable to initialize vlite").Base(err)
  33. }
  34. proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance)
  35. return &Handler{ctx: ctx}, nil
  36. }
  37. type Handler struct {
  38. ctx context.Context
  39. }
  40. func (h *Handler) Network() []net.Network {
  41. list := []net.Network{net.Network_UDP}
  42. return list
  43. }
  44. type status struct {
  45. config *UDPProtocolConfig
  46. password []byte
  47. msgbus *bus.Bus
  48. ctx context.Context
  49. transport transport.UnderlayTransportListener
  50. access sync.Mutex
  51. }
  52. func (s *status) RelayStream(conn io.ReadWriteCloser, ctx context.Context) {
  53. }
  54. func (s *status) Connection(conn gonet.Conn, connctx context.Context) context.Context {
  55. S_S2CTraffic := make(chan server.UDPServerTxToClientTraffic, 8)
  56. S_S2CDataTraffic := make(chan server.UDPServerTxToClientDataTraffic, 8)
  57. S_C2STraffic := make(chan server.UDPServerRxFromClientTraffic, 8)
  58. S_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  59. S_S2CDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  60. S_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
  61. go func(ctx context.Context) {
  62. for {
  63. select {
  64. case data := <-S_S2CTraffic:
  65. S_S2CTraffic2 <- interfaces.TrafficWithChannelTag(data)
  66. case <-ctx.Done():
  67. return
  68. }
  69. }
  70. }(connctx)
  71. go func(ctx context.Context) {
  72. for {
  73. select {
  74. case data := <-S_S2CDataTraffic:
  75. S_S2CDataTraffic2 <- interfaces.TrafficWithChannelTag(data)
  76. case <-ctx.Done():
  77. return
  78. }
  79. }
  80. }(connctx)
  81. go func(ctx context.Context) {
  82. for {
  83. select {
  84. case data := <-S_C2STraffic2:
  85. S_C2STraffic <- server.UDPServerRxFromClientTraffic(data)
  86. case <-ctx.Done():
  87. return
  88. }
  89. }
  90. }(connctx)
  91. if s.config.EnableStabilization && s.config.EnableRenegotiation {
  92. relay := udpsctpserver.NewPacketRelayServer(conn, S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx)
  93. udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay)
  94. _ = udpserver
  95. } else {
  96. relay := puniServer.NewPacketUniServer(S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx)
  97. relay.OnAutoCarrier(conn, connctx)
  98. udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay)
  99. _ = udpserver
  100. }
  101. return nil
  102. }
  103. func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) {
  104. s := &status{ctx: context.Background(), config: config}
  105. s.password = []byte(config.Password)
  106. s.msgbus = ibus.NewMessageBus()
  107. s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsMessageBus, s.msgbus)
  108. if config.ScramblePacket {
  109. s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPShouldMask, true)
  110. }
  111. if s.config.EnableFec {
  112. s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPFECEnabled, true)
  113. }
  114. s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPMask, string(s.password))
  115. if config.HandshakeMaskingPaddingSize != 0 {
  116. ctxv := &interfaces.ExtraOptionsUsePacketArmorValue{PacketArmorPaddingTo: int(config.HandshakeMaskingPaddingSize), UsePacketArmor: true}
  117. s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUsePacketArmor, ctxv)
  118. }
  119. return s, nil
  120. }
  121. func enableInterface(s *status) error {
  122. s.transport = s
  123. if s.config.EnableStabilization {
  124. s.transport = uniserver.NewUnifiedConnectionTransportHub(s, s.ctx)
  125. }
  126. s.transport = udpunis.NewUdpUniServer(string(s.password), s.ctx, s.transport)
  127. return nil
  128. }
  129. func (h *Handler) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
  130. proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment)
  131. statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status")
  132. if err != nil {
  133. return newError("uninitialized handler").Base(err)
  134. }
  135. statusInstance := statusInstanceIfce.(*status)
  136. err = h.ensureStarted(statusInstance)
  137. if err != nil {
  138. return newError("unable to initialize").Base(err)
  139. }
  140. finish := done.New()
  141. conn = newUDPConnAdaptor(conn, finish)
  142. var initialData [1600]byte
  143. c, err := conn.Read(initialData[:])
  144. if err != nil {
  145. return newError("unable to read initial data").Base(err)
  146. }
  147. connID := session.IDFromContext(ctx)
  148. vconn, connctx := udpServer.PrepareIncomingUDPConnection(conn, statusInstance.ctx, initialData[:c], strconv.FormatInt(int64(connID), 10))
  149. connctx = statusInstance.transport.Connection(vconn, connctx)
  150. if connctx == nil {
  151. return newError("invalid connection discarded")
  152. }
  153. <-finish.Wait()
  154. return nil
  155. }
  156. func (h *Handler) ensureStarted(s *status) error {
  157. s.access.Lock()
  158. defer s.access.Unlock()
  159. if s.transport == nil {
  160. err := enableInterface(s)
  161. if err != nil {
  162. return err
  163. }
  164. }
  165. return nil
  166. }
  167. func init() {
  168. common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  169. return NewUDPInboundHandler(ctx, config.(*UDPProtocolConfig))
  170. }))
  171. }