receive_stream.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. )
  11. type receiveStreamI interface {
  12. ReceiveStream
  13. handleStreamFrame(*wire.StreamFrame) error
  14. handleResetStreamFrame(*wire.ResetStreamFrame) error
  15. closeForShutdown(error)
  16. getWindowUpdate() protocol.ByteCount
  17. }
  18. type receiveStream struct {
  19. mutex sync.Mutex
  20. streamID protocol.StreamID
  21. sender streamSender
  22. frameQueue *frameSorter
  23. readOffset protocol.ByteCount
  24. currentFrame []byte
  25. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  26. readPosInFrame int
  27. closeForShutdownErr error
  28. cancelReadErr error
  29. resetRemotelyErr StreamError
  30. closedForShutdown bool // set when CloseForShutdown() is called
  31. finRead bool // set once we read a frame with a FinBit
  32. canceledRead bool // set when CancelRead() is called
  33. resetRemotely bool // set when HandleResetStreamFrame() is called
  34. readChan chan struct{}
  35. deadline time.Time
  36. deadlineTimer *time.Timer // initialized by SetReadDeadline()
  37. flowController flowcontrol.StreamFlowController
  38. version protocol.VersionNumber
  39. }
  40. var _ ReceiveStream = &receiveStream{}
  41. var _ receiveStreamI = &receiveStream{}
  42. func newReceiveStream(
  43. streamID protocol.StreamID,
  44. sender streamSender,
  45. flowController flowcontrol.StreamFlowController,
  46. version protocol.VersionNumber,
  47. ) *receiveStream {
  48. return &receiveStream{
  49. streamID: streamID,
  50. sender: sender,
  51. flowController: flowController,
  52. frameQueue: newFrameSorter(),
  53. readChan: make(chan struct{}, 1),
  54. version: version,
  55. }
  56. }
  57. func (s *receiveStream) StreamID() protocol.StreamID {
  58. return s.streamID
  59. }
  60. // Read implements io.Reader. It is not thread safe!
  61. func (s *receiveStream) Read(p []byte) (int, error) {
  62. completed, n, err := s.readImpl(p)
  63. if completed {
  64. s.sender.onStreamCompleted(s.streamID)
  65. }
  66. return n, err
  67. }
  68. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  69. s.mutex.Lock()
  70. defer s.mutex.Unlock()
  71. if s.finRead {
  72. return false, 0, io.EOF
  73. }
  74. if s.canceledRead {
  75. return false, 0, s.cancelReadErr
  76. }
  77. if s.resetRemotely {
  78. return false, 0, s.resetRemotelyErr
  79. }
  80. if s.closedForShutdown {
  81. return false, 0, s.closeForShutdownErr
  82. }
  83. bytesRead := 0
  84. for bytesRead < len(p) {
  85. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  86. s.dequeueNextFrame()
  87. }
  88. if s.currentFrame == nil && bytesRead > 0 {
  89. return false, bytesRead, s.closeForShutdownErr
  90. }
  91. for {
  92. // Stop waiting on errors
  93. if s.closedForShutdown {
  94. return false, bytesRead, s.closeForShutdownErr
  95. }
  96. if s.canceledRead {
  97. return false, bytesRead, s.cancelReadErr
  98. }
  99. if s.resetRemotely {
  100. return false, bytesRead, s.resetRemotelyErr
  101. }
  102. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  103. return false, bytesRead, errDeadline
  104. }
  105. if s.currentFrame != nil || s.currentFrameIsLast {
  106. break
  107. }
  108. s.mutex.Unlock()
  109. if s.deadline.IsZero() {
  110. <-s.readChan
  111. } else {
  112. select {
  113. case <-s.readChan:
  114. case <-s.deadlineTimer.C:
  115. }
  116. }
  117. s.mutex.Lock()
  118. if s.currentFrame == nil {
  119. s.dequeueNextFrame()
  120. }
  121. }
  122. if bytesRead > len(p) {
  123. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  124. }
  125. if s.readPosInFrame > len(s.currentFrame) {
  126. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  127. }
  128. s.mutex.Unlock()
  129. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  130. s.readPosInFrame += m
  131. bytesRead += m
  132. s.readOffset += protocol.ByteCount(m)
  133. s.mutex.Lock()
  134. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  135. if !s.resetRemotely {
  136. s.flowController.AddBytesRead(protocol.ByteCount(m))
  137. }
  138. // increase the flow control window, if necessary
  139. s.flowController.MaybeQueueWindowUpdate()
  140. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  141. s.finRead = true
  142. return true, bytesRead, io.EOF
  143. }
  144. }
  145. return false, bytesRead, nil
  146. }
  147. func (s *receiveStream) dequeueNextFrame() {
  148. s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
  149. s.readPosInFrame = 0
  150. }
  151. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
  152. s.mutex.Lock()
  153. defer s.mutex.Unlock()
  154. if s.finRead {
  155. return nil
  156. }
  157. if s.canceledRead {
  158. return nil
  159. }
  160. s.canceledRead = true
  161. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  162. s.signalRead()
  163. s.sender.queueControlFrame(&wire.StopSendingFrame{
  164. StreamID: s.streamID,
  165. ErrorCode: errorCode,
  166. })
  167. return nil
  168. }
  169. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  170. maxOffset := frame.Offset + frame.DataLen()
  171. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  172. return err
  173. }
  174. s.mutex.Lock()
  175. defer s.mutex.Unlock()
  176. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
  177. return err
  178. }
  179. s.signalRead()
  180. return nil
  181. }
  182. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  183. completed, err := s.handleResetStreamFrameImpl(frame)
  184. if completed {
  185. s.sender.onStreamCompleted(s.streamID)
  186. }
  187. return err
  188. }
  189. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  190. s.mutex.Lock()
  191. defer s.mutex.Unlock()
  192. if s.closedForShutdown {
  193. return false, nil
  194. }
  195. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  196. return false, err
  197. }
  198. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  199. if s.resetRemotely {
  200. return false, nil
  201. }
  202. s.resetRemotely = true
  203. s.resetRemotelyErr = streamCanceledError{
  204. errorCode: frame.ErrorCode,
  205. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  206. }
  207. s.signalRead()
  208. return true, nil
  209. }
  210. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  211. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  212. }
  213. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  214. s.mutex.Lock()
  215. defer s.mutex.Unlock()
  216. s.deadline = t
  217. if s.deadline.IsZero() { // skip if there's no deadline to set
  218. s.signalRead()
  219. return nil
  220. }
  221. // Lazily initialize the deadline timer.
  222. if s.deadlineTimer == nil {
  223. s.deadlineTimer = time.NewTimer(time.Until(t))
  224. return nil
  225. }
  226. // reset the timer to the new deadline
  227. if !s.deadlineTimer.Stop() {
  228. <-s.deadlineTimer.C
  229. }
  230. s.deadlineTimer.Reset(time.Until(t))
  231. return nil
  232. }
  233. // CloseForShutdown closes a stream abruptly.
  234. // It makes Read unblock (and return the error) immediately.
  235. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  236. func (s *receiveStream) closeForShutdown(err error) {
  237. s.mutex.Lock()
  238. s.closedForShutdown = true
  239. s.closeForShutdownErr = err
  240. s.mutex.Unlock()
  241. s.signalRead()
  242. }
  243. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  244. return s.flowController.GetWindowUpdate()
  245. }
  246. // signalRead performs a non-blocking send on the readChan
  247. func (s *receiveStream) signalRead() {
  248. select {
  249. case s.readChan <- struct{}{}:
  250. default:
  251. }
  252. }