receive_stream.go 7.3 KB


  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/protocol"
  9. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/utils"
  10. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type receiveStreamI interface {
  13. ReceiveStream
  14. handleStreamFrame(*wire.StreamFrame) error
  15. handleResetStreamFrame(*wire.ResetStreamFrame) error
  16. closeForShutdown(error)
  17. getWindowUpdate() protocol.ByteCount
  18. }
  19. type receiveStream struct {
  20. mutex sync.Mutex
  21. streamID protocol.StreamID
  22. sender streamSender
  23. frameQueue *frameSorter
  24. readOffset protocol.ByteCount
  25. currentFrame []byte
  26. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  27. readPosInFrame int
  28. closeForShutdownErr error
  29. cancelReadErr error
  30. resetRemotelyErr StreamError
  31. closedForShutdown bool // set when CloseForShutdown() is called
  32. finRead bool // set once we read a frame with a FinBit
  33. canceledRead bool // set when CancelRead() is called
  34. resetRemotely bool // set when HandleResetStreamFrame() is called
  35. readChan chan struct{}
  36. deadline time.Time
  37. flowController flowcontrol.StreamFlowController
  38. version protocol.VersionNumber
  39. }
  40. var _ ReceiveStream = &receiveStream{}
  41. var _ receiveStreamI = &receiveStream{}
  42. func newReceiveStream(
  43. streamID protocol.StreamID,
  44. sender streamSender,
  45. flowController flowcontrol.StreamFlowController,
  46. version protocol.VersionNumber,
  47. ) *receiveStream {
  48. return &receiveStream{
  49. streamID: streamID,
  50. sender: sender,
  51. flowController: flowController,
  52. frameQueue: newFrameSorter(),
  53. readChan: make(chan struct{}, 1),
  54. version: version,
  55. }
  56. }
  57. func (s *receiveStream) StreamID() protocol.StreamID {
  58. return s.streamID
  59. }
  60. // Read implements io.Reader. It is not thread safe!
  61. func (s *receiveStream) Read(p []byte) (int, error) {
  62. completed, n, err := s.readImpl(p)
  63. if completed {
  64. s.sender.onStreamCompleted(s.streamID)
  65. }
  66. return n, err
  67. }
  68. func (s *receiveStream) HasMoreData() bool {
  69. s.mutex.Lock()
  70. defer s.mutex.Unlock()
  71. return s.currentFrame != nil
  72. }
  73. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  74. s.mutex.Lock()
  75. defer s.mutex.Unlock()
  76. if s.finRead {
  77. return false, 0, io.EOF
  78. }
  79. if s.canceledRead {
  80. return false, 0, s.cancelReadErr
  81. }
  82. if s.resetRemotely {
  83. return false, 0, s.resetRemotelyErr
  84. }
  85. if s.closedForShutdown {
  86. return false, 0, s.closeForShutdownErr
  87. }
  88. bytesRead := 0
  89. for bytesRead < len(p) {
  90. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  91. s.dequeueNextFrame()
  92. }
  93. if s.currentFrame == nil && bytesRead > 0 {
  94. return false, bytesRead, s.closeForShutdownErr
  95. }
  96. var deadlineTimer *utils.Timer
  97. for {
  98. // Stop waiting on errors
  99. if s.closedForShutdown {
  100. return false, bytesRead, s.closeForShutdownErr
  101. }
  102. if s.canceledRead {
  103. return false, bytesRead, s.cancelReadErr
  104. }
  105. if s.resetRemotely {
  106. return false, bytesRead, s.resetRemotelyErr
  107. }
  108. deadline := s.deadline
  109. if !deadline.IsZero() {
  110. if !time.Now().Before(deadline) {
  111. return false, bytesRead, errDeadline
  112. }
  113. if deadlineTimer == nil {
  114. deadlineTimer = utils.NewTimer()
  115. }
  116. deadlineTimer.Reset(deadline)
  117. }
  118. if s.currentFrame != nil || s.currentFrameIsLast {
  119. break
  120. }
  121. s.mutex.Unlock()
  122. if deadline.IsZero() {
  123. <-s.readChan
  124. } else {
  125. select {
  126. case <-s.readChan:
  127. case <-deadlineTimer.Chan():
  128. deadlineTimer.SetRead()
  129. }
  130. }
  131. s.mutex.Lock()
  132. if s.currentFrame == nil {
  133. s.dequeueNextFrame()
  134. }
  135. }
  136. if bytesRead > len(p) {
  137. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  138. }
  139. if s.readPosInFrame > len(s.currentFrame) {
  140. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  141. }
  142. s.mutex.Unlock()
  143. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  144. s.readPosInFrame += m
  145. bytesRead += m
  146. s.readOffset += protocol.ByteCount(m)
  147. s.mutex.Lock()
  148. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  149. if !s.resetRemotely {
  150. s.flowController.AddBytesRead(protocol.ByteCount(m))
  151. }
  152. // increase the flow control window, if necessary
  153. s.flowController.MaybeQueueWindowUpdate()
  154. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  155. s.finRead = true
  156. return true, bytesRead, io.EOF
  157. }
  158. }
  159. return false, bytesRead, nil
  160. }
  161. func (s *receiveStream) dequeueNextFrame() {
  162. s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
  163. s.readPosInFrame = 0
  164. }
  165. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
  166. s.mutex.Lock()
  167. defer s.mutex.Unlock()
  168. if s.finRead {
  169. return nil
  170. }
  171. if s.canceledRead {
  172. return nil
  173. }
  174. s.canceledRead = true
  175. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  176. s.signalRead()
  177. s.sender.queueControlFrame(&wire.StopSendingFrame{
  178. StreamID: s.streamID,
  179. ErrorCode: errorCode,
  180. })
  181. return nil
  182. }
  183. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  184. maxOffset := frame.Offset + frame.DataLen()
  185. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  186. return err
  187. }
  188. s.mutex.Lock()
  189. defer s.mutex.Unlock()
  190. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
  191. return err
  192. }
  193. s.signalRead()
  194. return nil
  195. }
  196. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  197. completed, err := s.handleResetStreamFrameImpl(frame)
  198. if completed {
  199. s.sender.onStreamCompleted(s.streamID)
  200. }
  201. return err
  202. }
  203. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  204. s.mutex.Lock()
  205. defer s.mutex.Unlock()
  206. if s.closedForShutdown {
  207. return false, nil
  208. }
  209. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  210. return false, err
  211. }
  212. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  213. if s.resetRemotely {
  214. return false, nil
  215. }
  216. s.resetRemotely = true
  217. s.resetRemotelyErr = streamCanceledError{
  218. errorCode: frame.ErrorCode,
  219. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  220. }
  221. s.signalRead()
  222. return true, nil
  223. }
  224. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  225. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  226. }
  227. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  228. s.mutex.Lock()
  229. s.deadline = t
  230. s.mutex.Unlock()
  231. s.signalRead()
  232. return nil
  233. }
  234. // CloseForShutdown closes a stream abruptly.
  235. // It makes Read unblock (and return the error) immediately.
  236. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  237. func (s *receiveStream) closeForShutdown(err error) {
  238. s.mutex.Lock()
  239. s.closedForShutdown = true
  240. s.closeForShutdownErr = err
  241. s.mutex.Unlock()
  242. s.signalRead()
  243. }
  244. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  245. return s.flowController.GetWindowUpdate()
  246. }
  247. // signalRead performs a non-blocking send on the readChan
  248. func (s *receiveStream) signalRead() {
  249. select {
  250. case s.readChan <- struct{}{}:
  251. default:
  252. }
  253. }