client_session.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package shadowsocks2022
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "io"
  6. gonet "net"
  7. "sync"
  8. "time"
  9. "github.com/v2fly/v2ray-core/v5/common/buf"
  10. "github.com/v2fly/v2ray-core/v5/common/net"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet"
  12. )
  13. func NewClientUDPSession(ctx context.Context, conn io.ReadWriteCloser, packetProcessor UDPClientPacketProcessor) *ClientUDPSession {
  14. session := &ClientUDPSession{
  15. locker: &sync.Mutex{},
  16. conn: conn,
  17. packetProcessor: packetProcessor,
  18. sessionMap: make(map[string]*ClientUDPSessionConn),
  19. }
  20. session.ctx, session.finish = context.WithCancel(ctx)
  21. go session.KeepReading()
  22. return session
  23. }
  24. type ClientUDPSession struct {
  25. locker *sync.Mutex
  26. conn io.ReadWriteCloser
  27. packetProcessor UDPClientPacketProcessor
  28. sessionMap map[string]*ClientUDPSessionConn
  29. ctx context.Context
  30. finish func()
  31. }
  32. func (c *ClientUDPSession) Close() error {
  33. c.finish()
  34. return c.conn.Close()
  35. }
  36. func (c *ClientUDPSession) WriteUDPRequest(request *UDPRequest) error {
  37. buffer := buf.New()
  38. defer buffer.Release()
  39. err := c.packetProcessor.EncodeUDPRequest(request, buffer)
  40. if request.Payload != nil {
  41. request.Payload.Release()
  42. }
  43. if err != nil {
  44. return newError("unable to encode udp request").Base(err)
  45. }
  46. _, err = c.conn.Write(buffer.Bytes())
  47. if err != nil {
  48. return newError("unable to write to conn").Base(err)
  49. }
  50. return nil
  51. }
  52. func (c *ClientUDPSession) KeepReading() {
  53. for c.ctx.Err() == nil {
  54. udpResp := &UDPResponse{}
  55. buffer := make([]byte, 1600)
  56. n, err := c.conn.Read(buffer)
  57. if err != nil {
  58. newError("unable to read from conn").Base(err).WriteToLog()
  59. return
  60. }
  61. if n != 0 {
  62. err := c.packetProcessor.DecodeUDPResp(buffer[:n], udpResp)
  63. if err != nil {
  64. newError("unable to decode udp response").Base(err).WriteToLog()
  65. continue
  66. }
  67. {
  68. timeDifference := int64(udpResp.TimeStamp) - time.Now().Unix()
  69. if timeDifference < -30 || timeDifference > 30 {
  70. newError("udp packet timestamp difference too large, packet discarded").WriteToLog()
  71. continue
  72. }
  73. }
  74. c.locker.Lock()
  75. session, ok := c.sessionMap[string(udpResp.ClientSessionID[:])]
  76. if ok {
  77. select {
  78. case session.readChan <- udpResp:
  79. default:
  80. }
  81. } else {
  82. newError("misbehaving server: unknown client session ID").Base(err).WriteToLog()
  83. }
  84. c.locker.Unlock()
  85. }
  86. }
  87. }
  88. func (c *ClientUDPSession) NewSessionConn() (internet.AbstractPacketConn, error) {
  89. sessionID := make([]byte, 8)
  90. _, err := rand.Read(sessionID)
  91. if err != nil {
  92. return nil, newError("unable to generate session id").Base(err)
  93. }
  94. connctx, connfinish := context.WithCancel(c.ctx)
  95. sessionConn := &ClientUDPSessionConn{
  96. sessionID: string(sessionID),
  97. readChan: make(chan *UDPResponse, 16),
  98. parent: c,
  99. ctx: connctx,
  100. finish: connfinish,
  101. nextWritePacketID: 0,
  102. }
  103. c.locker.Lock()
  104. c.sessionMap[sessionConn.sessionID] = sessionConn
  105. c.locker.Unlock()
  106. return sessionConn, nil
  107. }
  108. type ClientUDPSessionConn struct {
  109. sessionID string
  110. readChan chan *UDPResponse
  111. parent *ClientUDPSession
  112. nextWritePacketID uint64
  113. ctx context.Context
  114. finish func()
  115. }
  116. func (c *ClientUDPSessionConn) Close() error {
  117. delete(c.parent.sessionMap, c.sessionID)
  118. c.finish()
  119. return nil
  120. }
  121. func (c *ClientUDPSessionConn) WriteTo(p []byte, addr gonet.Addr) (n int, err error) {
  122. thisPacketID := c.nextWritePacketID
  123. c.nextWritePacketID += 1
  124. req := &UDPRequest{
  125. SessionID: [8]byte{},
  126. PacketID: thisPacketID,
  127. TimeStamp: uint64(time.Now().Unix()),
  128. Address: net.IPAddress(addr.(*gonet.UDPAddr).IP),
  129. Port: addr.(*net.UDPAddr).Port,
  130. Payload: nil,
  131. }
  132. copy(req.SessionID[:], c.sessionID)
  133. req.Payload = buf.New()
  134. req.Payload.Write(p)
  135. err = c.parent.WriteUDPRequest(req)
  136. if err != nil {
  137. return 0, newError("unable to write to parent session").Base(err)
  138. }
  139. return len(p), nil
  140. }
  141. func (c *ClientUDPSessionConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  142. select {
  143. case <-c.ctx.Done():
  144. return 0, nil, io.EOF
  145. case resp := <-c.readChan:
  146. n = copy(p, resp.Payload.Bytes())
  147. resp.Payload.Release()
  148. addr = &net.UDPAddr{IP: resp.Address.IP(), Port: resp.Port}
  149. }
  150. return
  151. }