udpassemblerServer.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package packetconn
  2. import (
  3. "crypto/rand"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "golang.org/x/net/context"
  9. "github.com/v2fly/v2ray-core/v5/common/environment"
  10. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  11. net2 "github.com/v2fly/v2ray-core/v5/common/net"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet"
  13. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  14. )
  15. type packet struct {
  16. addr string
  17. data []byte
  18. }
  19. type wrappedPacketConn struct {
  20. connLock *sync.Mutex
  21. conn map[string]*serverSession
  22. readChan chan packet
  23. ctx context.Context
  24. finish func()
  25. }
  26. func (w *wrappedPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  27. select {
  28. case pack := <-w.readChan:
  29. n := copy(p, pack.data)
  30. if n < len(pack.data) {
  31. return n, nil, io.ErrShortBuffer
  32. }
  33. return n, &net.UDPAddr{IP: net2.IP(pack.addr)}, nil
  34. case <-w.ctx.Done():
  35. return 0, nil, w.ctx.Err()
  36. }
  37. }
  38. func (w *wrappedPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
  39. w.connLock.Lock()
  40. conn := w.conn[string(addr.(*net.UDPAddr).IP)]
  41. w.connLock.Unlock()
  42. return conn.Write(p)
  43. }
  44. func (w *wrappedPacketConn) Close() error {
  45. w.finish()
  46. return nil
  47. }
  48. func (w *wrappedPacketConn) LocalAddr() net.Addr {
  49. return nil
  50. }
  51. func (w *wrappedPacketConn) SetDeadline(t time.Time) error {
  52. return nil
  53. }
  54. func (w *wrappedPacketConn) SetReadDeadline(t time.Time) error {
  55. return nil
  56. }
  57. func (w *wrappedPacketConn) SetWriteDeadline(t time.Time) error {
  58. return nil
  59. }
  60. func (w wrappedPacketConn) OnNewSession(ctx context.Context, sess request.Session, opts ...request.SessionOption) error {
  61. imaginaryAddr := net2.UDPAddr{
  62. IP: net2.AnyIPv6.IP(),
  63. Port: 0,
  64. }
  65. rand.Read([]byte(imaginaryAddr.IP))
  66. session := newServerSession(ctx, sess, string(imaginaryAddr.IP), &w)
  67. w.connLock.Lock()
  68. w.conn[string(imaginaryAddr.IP)] = session
  69. w.connLock.Unlock()
  70. session.start()
  71. return nil
  72. }
  73. func newServerSession(ctx context.Context, sess request.Session, name string, listener *wrappedPacketConn) *serverSession {
  74. _ = ctx
  75. return &serverSession{session: sess, name: name, listener: listener}
  76. }
  77. type serverSession struct {
  78. name string
  79. session request.Session
  80. listener *wrappedPacketConn
  81. }
  82. func (s *serverSession) start() {
  83. go func() {
  84. for {
  85. select {
  86. case <-s.listener.ctx.Done():
  87. return
  88. default:
  89. buf := make([]byte, 2000)
  90. n, err := s.session.Read(buf)
  91. if err != nil || n > 2000 {
  92. return
  93. }
  94. s.listener.readChan <- packet{s.name, buf[:n]}
  95. }
  96. }
  97. }()
  98. }
  99. func (s *serverSession) Write(p []byte) (int, error) {
  100. return s.session.Write(p)
  101. }
  102. type udpAssemblerServer struct {
  103. ctx context.Context
  104. streamSettings *internet.MemoryStreamConfig
  105. assembly request.TransportServerAssembly
  106. req2packs *requestToPacketConnServer
  107. listener internet.Listener
  108. }
  109. func (u *udpAssemblerServer) Start() error {
  110. listener, err := u.listen(net2.LocalHostIP, 0)
  111. if err != nil {
  112. return newError("failed to listen").Base(err).AtError()
  113. }
  114. u.listener = listener
  115. return nil
  116. }
  117. func (u *udpAssemblerServer) Close() error {
  118. return u.listener.Close()
  119. }
  120. func (u *udpAssemblerServer) OnRoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption) (resp request.Response, err error) {
  121. return u.req2packs.OnRoundTrip(ctx, req, opts...)
  122. }
  123. func (u *udpAssemblerServer) OnTransportServerAssemblyReady(assembly request.TransportServerAssembly) {
  124. u.assembly = assembly
  125. }
  126. func newUDPAssemblerServer(ctx context.Context, config *ServerConfig, streamSettings *internet.MemoryStreamConfig) *udpAssemblerServer {
  127. transportEnvironment := envctx.EnvironmentFromContext(ctx).(environment.TransportEnvironment)
  128. transportEnvironmentWrapped := &wrappedTransportEnvironment{TransportEnvironment: transportEnvironment}
  129. transportEnvironmentWrapped.server = newRequestToPacketConnServer(ctx, config)
  130. wrappedContext := envctx.ContextWithEnvironment(ctx, transportEnvironmentWrapped)
  131. return &udpAssemblerServer{ctx: wrappedContext, streamSettings: streamSettings, req2packs: transportEnvironmentWrapped.server}
  132. }
  133. func (u *udpAssemblerServer) listen(address net2.Address, port net2.Port) (internet.Listener, error) {
  134. return internet.ListenTCP(u.ctx, address, port, u.streamSettings, func(connection internet.Connection) {
  135. err := u.assembly.SessionReceiver().OnNewSession(u.ctx, connection)
  136. if err != nil {
  137. newError("failed to handle new session").Base(err).WriteToLog()
  138. }
  139. })
  140. }