receive_stream.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. )
  11. type receiveStreamI interface {
  12. ReceiveStream
  13. handleStreamFrame(*wire.StreamFrame) error
  14. handleResetStreamFrame(*wire.ResetStreamFrame) error
  15. closeForShutdown(error)
  16. getWindowUpdate() protocol.ByteCount
  17. }
  18. type receiveStream struct {
  19. mutex sync.Mutex
  20. streamID protocol.StreamID
  21. sender streamSender
  22. frameQueue *frameSorter
  23. readOffset protocol.ByteCount
  24. currentFrame []byte
  25. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  26. readPosInFrame int
  27. closeForShutdownErr error
  28. cancelReadErr error
  29. resetRemotelyErr StreamError
  30. closedForShutdown bool // set when CloseForShutdown() is called
  31. finRead bool // set once we read a frame with a FinBit
  32. canceledRead bool // set when CancelRead() is called
  33. resetRemotely bool // set when HandleResetStreamFrame() is called
  34. readChan chan struct{}
  35. deadline time.Time
  36. deadlineTimer *time.Timer // initialized by SetReadDeadline()
  37. flowController flowcontrol.StreamFlowController
  38. version protocol.VersionNumber
  39. }
  40. var _ ReceiveStream = &receiveStream{}
  41. var _ receiveStreamI = &receiveStream{}
  42. func newReceiveStream(
  43. streamID protocol.StreamID,
  44. sender streamSender,
  45. flowController flowcontrol.StreamFlowController,
  46. version protocol.VersionNumber,
  47. ) *receiveStream {
  48. return &receiveStream{
  49. streamID: streamID,
  50. sender: sender,
  51. flowController: flowController,
  52. frameQueue: newFrameSorter(),
  53. readChan: make(chan struct{}, 1),
  54. version: version,
  55. }
  56. }
  57. func (s *receiveStream) StreamID() protocol.StreamID {
  58. return s.streamID
  59. }
  60. // Read implements io.Reader. It is not thread safe!
  61. func (s *receiveStream) Read(p []byte) (int, error) {
  62. completed, n, err := s.readImpl(p)
  63. if completed {
  64. s.sender.onStreamCompleted(s.streamID)
  65. }
  66. return n, err
  67. }
  68. func (s *receiveStream) HasMoreData() bool {
  69. s.mutex.Lock()
  70. defer s.mutex.Unlock()
  71. return s.currentFrame != nil
  72. }
  73. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  74. s.mutex.Lock()
  75. defer s.mutex.Unlock()
  76. if s.finRead {
  77. return false, 0, io.EOF
  78. }
  79. if s.canceledRead {
  80. return false, 0, s.cancelReadErr
  81. }
  82. if s.resetRemotely {
  83. return false, 0, s.resetRemotelyErr
  84. }
  85. if s.closedForShutdown {
  86. return false, 0, s.closeForShutdownErr
  87. }
  88. bytesRead := 0
  89. for bytesRead < len(p) {
  90. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  91. s.dequeueNextFrame()
  92. }
  93. if s.currentFrame == nil && bytesRead > 0 {
  94. return false, bytesRead, s.closeForShutdownErr
  95. }
  96. for {
  97. // Stop waiting on errors
  98. if s.closedForShutdown {
  99. return false, bytesRead, s.closeForShutdownErr
  100. }
  101. if s.canceledRead {
  102. return false, bytesRead, s.cancelReadErr
  103. }
  104. if s.resetRemotely {
  105. return false, bytesRead, s.resetRemotelyErr
  106. }
  107. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  108. return false, bytesRead, errDeadline
  109. }
  110. if s.currentFrame != nil || s.currentFrameIsLast {
  111. break
  112. }
  113. s.mutex.Unlock()
  114. if s.deadline.IsZero() {
  115. <-s.readChan
  116. } else {
  117. select {
  118. case <-s.readChan:
  119. case <-s.deadlineTimer.C:
  120. }
  121. }
  122. s.mutex.Lock()
  123. if s.currentFrame == nil {
  124. s.dequeueNextFrame()
  125. }
  126. }
  127. if bytesRead > len(p) {
  128. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  129. }
  130. if s.readPosInFrame > len(s.currentFrame) {
  131. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  132. }
  133. s.mutex.Unlock()
  134. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  135. s.readPosInFrame += m
  136. bytesRead += m
  137. s.readOffset += protocol.ByteCount(m)
  138. s.mutex.Lock()
  139. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  140. if !s.resetRemotely {
  141. s.flowController.AddBytesRead(protocol.ByteCount(m))
  142. }
  143. // increase the flow control window, if necessary
  144. s.flowController.MaybeQueueWindowUpdate()
  145. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  146. s.finRead = true
  147. return true, bytesRead, io.EOF
  148. }
  149. }
  150. return false, bytesRead, nil
  151. }
  152. func (s *receiveStream) dequeueNextFrame() {
  153. s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
  154. s.readPosInFrame = 0
  155. }
  156. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
  157. s.mutex.Lock()
  158. defer s.mutex.Unlock()
  159. if s.finRead {
  160. return nil
  161. }
  162. if s.canceledRead {
  163. return nil
  164. }
  165. s.canceledRead = true
  166. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  167. s.signalRead()
  168. s.sender.queueControlFrame(&wire.StopSendingFrame{
  169. StreamID: s.streamID,
  170. ErrorCode: errorCode,
  171. })
  172. return nil
  173. }
  174. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  175. maxOffset := frame.Offset + frame.DataLen()
  176. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  177. return err
  178. }
  179. s.mutex.Lock()
  180. defer s.mutex.Unlock()
  181. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
  182. return err
  183. }
  184. s.signalRead()
  185. return nil
  186. }
  187. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  188. completed, err := s.handleResetStreamFrameImpl(frame)
  189. if completed {
  190. s.sender.onStreamCompleted(s.streamID)
  191. }
  192. return err
  193. }
  194. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  195. s.mutex.Lock()
  196. defer s.mutex.Unlock()
  197. if s.closedForShutdown {
  198. return false, nil
  199. }
  200. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  201. return false, err
  202. }
  203. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  204. if s.resetRemotely {
  205. return false, nil
  206. }
  207. s.resetRemotely = true
  208. s.resetRemotelyErr = streamCanceledError{
  209. errorCode: frame.ErrorCode,
  210. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  211. }
  212. s.signalRead()
  213. return true, nil
  214. }
  215. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  216. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  217. }
  218. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  219. s.mutex.Lock()
  220. defer s.mutex.Unlock()
  221. s.deadline = t
  222. if s.deadline.IsZero() { // skip if there's no deadline to set
  223. s.signalRead()
  224. return nil
  225. }
  226. // Lazily initialize the deadline timer.
  227. if s.deadlineTimer == nil {
  228. s.deadlineTimer = time.NewTimer(time.Until(t))
  229. return nil
  230. }
  231. // reset the timer to the new deadline
  232. if !s.deadlineTimer.Stop() {
  233. <-s.deadlineTimer.C
  234. }
  235. s.deadlineTimer.Reset(time.Until(t))
  236. return nil
  237. }
  238. // CloseForShutdown closes a stream abruptly.
  239. // It makes Read unblock (and return the error) immediately.
  240. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  241. func (s *receiveStream) closeForShutdown(err error) {
  242. s.mutex.Lock()
  243. s.closedForShutdown = true
  244. s.closeForShutdownErr = err
  245. s.mutex.Unlock()
  246. s.signalRead()
  247. }
  248. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  249. return s.flowController.GetWindowUpdate()
  250. }
  251. // signalRead performs a non-blocking send on the readChan
  252. func (s *receiveStream) signalRead() {
  253. select {
  254. case s.readChan <- struct{}{}:
  255. default:
  256. }
  257. }