client_session.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package shadowsocks2022
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "github.com/v2fly/v2ray-core/v5/common/buf"
  6. "github.com/v2fly/v2ray-core/v5/common/net"
  7. "github.com/v2fly/v2ray-core/v5/transport/internet"
  8. "io"
  9. gonet "net"
  10. "sync"
  11. "time"
  12. )
  13. func NewClientUDPSession(ctx context.Context, conn io.ReadWriteCloser, packetProcessor UDPClientPacketProcessor) *ClientUDPSession {
  14. session := &ClientUDPSession{
  15. locker: &sync.Mutex{},
  16. conn: conn,
  17. packetProcessor: packetProcessor,
  18. sessionMap: make(map[string]*ClientUDPSessionConn),
  19. }
  20. session.ctx, session.finish = context.WithCancel(ctx)
  21. go session.KeepReading()
  22. return session
  23. }
  24. type ClientUDPSession struct {
  25. locker *sync.Mutex
  26. conn io.ReadWriteCloser
  27. packetProcessor UDPClientPacketProcessor
  28. sessionMap map[string]*ClientUDPSessionConn
  29. ctx context.Context
  30. finish func()
  31. }
  32. func (c *ClientUDPSession) Close() error {
  33. c.finish()
  34. return c.conn.Close()
  35. }
  36. func (c *ClientUDPSession) WriteUDPRequest(request *UDPRequest) error {
  37. buffer := buf.New()
  38. defer buffer.Release()
  39. err := c.packetProcessor.EncodeUDPRequest(request, buffer)
  40. if request.Payload != nil {
  41. request.Payload.Release()
  42. }
  43. if err != nil {
  44. return newError("unable to encode udp request").Base(err)
  45. }
  46. _, err = c.conn.Write(buffer.Bytes())
  47. if err != nil {
  48. return newError("unable to write to conn").Base(err)
  49. }
  50. return nil
  51. }
  52. func (c *ClientUDPSession) KeepReading() {
  53. for c.ctx.Err() == nil {
  54. udpResp := &UDPResponse{}
  55. buffer := make([]byte, 1600)
  56. n, err := c.conn.Read(buffer)
  57. if err != nil {
  58. newError("unable to read from conn").Base(err).WriteToLog()
  59. return
  60. }
  61. if n != 0 {
  62. err := c.packetProcessor.DecodeUDPResp(buffer[:n], udpResp)
  63. if err != nil {
  64. newError("unable to decode udp response").Base(err).WriteToLog()
  65. continue
  66. }
  67. c.locker.Lock()
  68. session, ok := c.sessionMap[string(udpResp.ClientSessionID[:])]
  69. if ok {
  70. select {
  71. case session.readChan <- udpResp:
  72. default:
  73. }
  74. } else {
  75. newError("misbehaving server: unknown client session ID").Base(err).WriteToLog()
  76. }
  77. c.locker.Unlock()
  78. }
  79. }
  80. }
  81. func (c *ClientUDPSession) NewSessionConn() (internet.AbstractPacketConn, error) {
  82. sessionID := make([]byte, 8)
  83. _, err := rand.Read(sessionID)
  84. if err != nil {
  85. return nil, newError("unable to generate session id").Base(err)
  86. }
  87. connctx, connfinish := context.WithCancel(c.ctx)
  88. sessionConn := &ClientUDPSessionConn{
  89. sessionID: string(sessionID),
  90. readChan: make(chan *UDPResponse, 16),
  91. parent: c,
  92. ctx: connctx,
  93. finish: connfinish,
  94. nextWritePacketID: 0,
  95. }
  96. c.locker.Lock()
  97. c.sessionMap[sessionConn.sessionID] = sessionConn
  98. c.locker.Unlock()
  99. return sessionConn, nil
  100. }
  101. type ClientUDPSessionConn struct {
  102. sessionID string
  103. readChan chan *UDPResponse
  104. parent *ClientUDPSession
  105. nextWritePacketID uint64
  106. ctx context.Context
  107. finish func()
  108. }
  109. func (c *ClientUDPSessionConn) Close() error {
  110. delete(c.parent.sessionMap, c.sessionID)
  111. c.finish()
  112. return nil
  113. }
  114. func (c *ClientUDPSessionConn) WriteTo(p []byte, addr gonet.Addr) (n int, err error) {
  115. thisPacketID := c.nextWritePacketID
  116. c.nextWritePacketID += 1
  117. req := &UDPRequest{
  118. SessionID: [8]byte{},
  119. PacketID: thisPacketID,
  120. TimeStamp: uint64(time.Now().Unix()),
  121. Address: net.IPAddress(addr.(*gonet.UDPAddr).IP),
  122. Port: addr.(*net.UDPAddr).Port,
  123. Payload: nil,
  124. }
  125. copy(req.SessionID[:], c.sessionID)
  126. req.Payload = buf.New()
  127. req.Payload.Write(p)
  128. err = c.parent.WriteUDPRequest(req)
  129. if err != nil {
  130. return 0, newError("unable to write to parent session").Base(err)
  131. }
  132. return len(p), nil
  133. }
  134. func (c *ClientUDPSessionConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  135. select {
  136. case <-c.ctx.Done():
  137. return 0, nil, io.EOF
  138. case resp := <-c.readChan:
  139. n = copy(p, resp.Payload.Bytes())
  140. resp.Payload.Release()
  141. addr = &net.UDPAddr{IP: resp.Address.IP(), Port: int(resp.Port)}
  142. }
  143. return
  144. }