client.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package simple
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/rand"
  6. "io"
  7. "time"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  10. )
  11. func newClient(config *ClientConfig) request.SessionAssemblerClient {
  12. return &simpleAssemblerClient{config: config}
  13. }
  14. type simpleAssemblerClient struct {
  15. assembly request.TransportClientAssembly
  16. config *ClientConfig
  17. }
  18. func (s *simpleAssemblerClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) {
  19. s.assembly = assembly
  20. }
  21. func (s *simpleAssemblerClient) NewSession(ctx context.Context, opts ...request.SessionOption) (request.Session, error) {
  22. sessionID := make([]byte, 16)
  23. _, err := io.ReadFull(rand.Reader, sessionID)
  24. if err != nil {
  25. return nil, err
  26. }
  27. sessionContext, finish := context.WithCancel(ctx)
  28. session := &simpleAssemblerClientSession{
  29. sessionID: sessionID, tripper: s.assembly.Tripper(), readBuffer: bytes.NewBuffer(nil),
  30. ctx: sessionContext, finish: finish, writerChan: make(chan []byte), readerChan: make(chan []byte, 16), assembler: s,
  31. }
  32. go session.keepRunning()
  33. return session, nil
  34. }
  35. type simpleAssemblerClientSession struct {
  36. sessionID []byte
  37. currentWriteWait int
  38. assembler *simpleAssemblerClient
  39. tripper request.Tripper
  40. readBuffer *bytes.Buffer
  41. writerChan chan []byte
  42. readerChan chan []byte
  43. ctx context.Context
  44. finish func()
  45. }
  46. func (s *simpleAssemblerClientSession) keepRunning() {
  47. s.currentWriteWait = int(s.assembler.config.InitialPollingIntervalMs)
  48. for s.ctx.Err() == nil {
  49. s.runOnce()
  50. }
  51. }
  52. func (s *simpleAssemblerClientSession) runOnce() {
  53. sendBuffer := bytes.NewBuffer(nil)
  54. if s.currentWriteWait != 0 {
  55. waitTimer := time.NewTimer(time.Millisecond * time.Duration(s.currentWriteWait))
  56. waitForFirstWrite := true
  57. copyFromWriterLoop:
  58. for {
  59. select {
  60. case <-s.ctx.Done():
  61. return
  62. case data := <-s.writerChan:
  63. sendBuffer.Write(data)
  64. if sendBuffer.Len() >= int(s.assembler.config.MaxWriteSize) {
  65. break copyFromWriterLoop
  66. }
  67. if waitForFirstWrite {
  68. waitForFirstWrite = false
  69. waitTimer.Reset(time.Millisecond * time.Duration(s.assembler.config.WaitSubsequentWriteMs))
  70. }
  71. case <-waitTimer.C:
  72. break copyFromWriterLoop
  73. }
  74. }
  75. waitTimer.Stop()
  76. }
  77. firstRound := true
  78. pollConnection := true
  79. for sendBuffer.Len() != 0 || firstRound {
  80. firstRound = false
  81. sendAmount := sendBuffer.Len()
  82. if sendAmount > int(s.assembler.config.MaxWriteSize) {
  83. sendAmount = int(s.assembler.config.MaxWriteSize)
  84. }
  85. data := sendBuffer.Next(sendAmount)
  86. if len(data) != 0 {
  87. pollConnection = false
  88. }
  89. for {
  90. resp, err := s.tripper.RoundTrip(s.ctx, request.Request{Data: data, ConnectionTag: s.sessionID})
  91. if err != nil {
  92. newError("failed to send data").Base(err).WriteToLog()
  93. if s.ctx.Err() != nil {
  94. return
  95. }
  96. time.Sleep(time.Millisecond * time.Duration(s.assembler.config.FailedRetryIntervalMs))
  97. continue
  98. }
  99. if len(resp.Data) != 0 {
  100. s.readerChan <- resp.Data
  101. }
  102. if len(resp.Data) != 0 {
  103. pollConnection = false
  104. }
  105. break
  106. }
  107. }
  108. if pollConnection {
  109. s.currentWriteWait = int(s.assembler.config.BackoffFactor * float32(s.currentWriteWait))
  110. if s.currentWriteWait > int(s.assembler.config.MaxPollingIntervalMs) {
  111. s.currentWriteWait = int(s.assembler.config.MaxPollingIntervalMs)
  112. }
  113. if s.currentWriteWait < int(s.assembler.config.MinPollingIntervalMs) {
  114. s.currentWriteWait = int(s.assembler.config.MinPollingIntervalMs)
  115. }
  116. } else {
  117. s.currentWriteWait = int(0)
  118. }
  119. }
  120. func (s *simpleAssemblerClientSession) Read(p []byte) (n int, err error) {
  121. if s.readBuffer.Len() == 0 {
  122. select {
  123. case <-s.ctx.Done():
  124. return 0, s.ctx.Err()
  125. case data := <-s.readerChan:
  126. s.readBuffer.Write(data)
  127. }
  128. }
  129. n, err = s.readBuffer.Read(p)
  130. if err == io.EOF {
  131. s.readBuffer.Reset()
  132. return 0, nil
  133. }
  134. return
  135. }
  136. func (s *simpleAssemblerClientSession) Write(p []byte) (n int, err error) {
  137. buf := make([]byte, len(p))
  138. copy(buf, p)
  139. select {
  140. case <-s.ctx.Done():
  141. return 0, s.ctx.Err()
  142. case s.writerChan <- buf:
  143. return len(p), nil
  144. }
  145. }
  146. func (s *simpleAssemblerClientSession) Close() error {
  147. s.finish()
  148. return nil
  149. }
  150. func init() {
  151. common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  152. clientConfig, ok := config.(*ClientConfig)
  153. if !ok {
  154. return nil, newError("not a ClientConfig")
  155. }
  156. return newClient(clientConfig), nil
  157. }))
  158. }