client.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package simple
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/rand"
  6. "io"
  7. "time"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  10. )
  11. func newClient(config *ClientConfig) request.SessionAssemblerClient {
  12. return &simpleAssemblerClient{config: config}
  13. }
  14. type simpleAssemblerClient struct {
  15. assembly request.TransportClientAssembly
  16. config *ClientConfig
  17. }
  18. func (s *simpleAssemblerClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) {
  19. s.assembly = assembly
  20. }
  21. func (s *simpleAssemblerClient) NewSession(ctx context.Context, opts ...request.SessionOption) (request.Session, error) {
  22. sessionID := make([]byte, 16)
  23. _, err := io.ReadFull(rand.Reader, sessionID)
  24. if err != nil {
  25. return nil, err
  26. }
  27. sessionContext, finish := context.WithCancel(ctx)
  28. session := &simpleAssemblerClientSession{
  29. sessionID: sessionID, tripper: s.assembly.Tripper(), readBuffer: bytes.NewBuffer(nil),
  30. ctx: sessionContext, finish: finish, writerChan: make(chan []byte), readerChan: make(chan []byte, 16), assembler: s}
  31. go session.keepRunning()
  32. return session, nil
  33. }
  34. type simpleAssemblerClientSession struct {
  35. sessionID []byte
  36. currentWriteWait int
  37. assembler *simpleAssemblerClient
  38. tripper request.Tripper
  39. readBuffer *bytes.Buffer
  40. writerChan chan []byte
  41. readerChan chan []byte
  42. ctx context.Context
  43. finish func()
  44. }
  45. func (s *simpleAssemblerClientSession) keepRunning() {
  46. s.currentWriteWait = int(s.assembler.config.InitialPollingIntervalMs)
  47. for s.ctx.Err() == nil {
  48. s.runOnce()
  49. }
  50. }
  51. func (s *simpleAssemblerClientSession) runOnce() {
  52. sendBuffer := bytes.NewBuffer(nil)
  53. if s.currentWriteWait != 0 {
  54. waitTimer := time.NewTimer(time.Millisecond * time.Duration(s.currentWriteWait))
  55. waitForFirstWrite := true
  56. copyFromWriterLoop:
  57. for {
  58. select {
  59. case <-s.ctx.Done():
  60. return
  61. case data := <-s.writerChan:
  62. sendBuffer.Write(data)
  63. if sendBuffer.Len() >= int(s.assembler.config.MaxWriteSize) {
  64. break copyFromWriterLoop
  65. }
  66. if waitForFirstWrite {
  67. waitForFirstWrite = false
  68. waitTimer.Reset(time.Millisecond * time.Duration(s.assembler.config.WaitSubsequentWriteMs))
  69. }
  70. case <-waitTimer.C:
  71. break copyFromWriterLoop
  72. }
  73. }
  74. waitTimer.Stop()
  75. }
  76. firstRound := true
  77. pollConnection := true
  78. for sendBuffer.Len() != 0 || firstRound {
  79. firstRound = false
  80. sendAmount := sendBuffer.Len()
  81. if sendAmount > int(s.assembler.config.MaxWriteSize) {
  82. sendAmount = int(s.assembler.config.MaxWriteSize)
  83. }
  84. data := sendBuffer.Next(sendAmount)
  85. if len(data) != 0 {
  86. pollConnection = false
  87. }
  88. for {
  89. resp, err := s.tripper.RoundTrip(s.ctx, request.Request{Data: data, ConnectionTag: s.sessionID})
  90. if err != nil {
  91. newError("failed to send data").Base(err).WriteToLog()
  92. if s.ctx.Err() != nil {
  93. return
  94. }
  95. time.Sleep(time.Millisecond * time.Duration(s.assembler.config.FailedRetryIntervalMs))
  96. continue
  97. }
  98. if len(resp.Data) != 0 {
  99. s.readerChan <- resp.Data
  100. }
  101. if len(resp.Data) != 0 {
  102. pollConnection = false
  103. }
  104. break
  105. }
  106. }
  107. if pollConnection {
  108. s.currentWriteWait = int(s.assembler.config.BackoffFactor * float32(s.currentWriteWait))
  109. if s.currentWriteWait > int(s.assembler.config.MaxPollingIntervalMs) {
  110. s.currentWriteWait = int(s.assembler.config.MaxPollingIntervalMs)
  111. }
  112. if s.currentWriteWait < int(s.assembler.config.MinPollingIntervalMs) {
  113. s.currentWriteWait = int(s.assembler.config.MinPollingIntervalMs)
  114. }
  115. } else {
  116. s.currentWriteWait = int(0)
  117. }
  118. }
  119. func (s *simpleAssemblerClientSession) Read(p []byte) (n int, err error) {
  120. if s.readBuffer.Len() == 0 {
  121. select {
  122. case <-s.ctx.Done():
  123. return 0, s.ctx.Err()
  124. case data := <-s.readerChan:
  125. s.readBuffer.Write(data)
  126. }
  127. }
  128. n, err = s.readBuffer.Read(p)
  129. if err == io.EOF {
  130. s.readBuffer.Reset()
  131. return 0, nil
  132. }
  133. return
  134. }
  135. func (s *simpleAssemblerClientSession) Write(p []byte) (n int, err error) {
  136. buf := make([]byte, len(p))
  137. copy(buf, p)
  138. select {
  139. case <-s.ctx.Done():
  140. return 0, s.ctx.Err()
  141. case s.writerChan <- buf:
  142. return len(p), nil
  143. }
  144. }
  145. func (s *simpleAssemblerClientSession) Close() error {
  146. s.finish()
  147. return nil
  148. }
  149. func init() {
  150. common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  151. clientConfig, ok := config.(*ClientConfig)
  152. if !ok {
  153. return nil, newError("not a ClientConfig")
  154. }
  155. return newClient(clientConfig), nil
  156. }))
  157. }