receive_stream.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. )
  11. type receiveStreamI interface {
  12. ReceiveStream
  13. handleStreamFrame(*wire.StreamFrame) error
  14. handleRstStreamFrame(*wire.RstStreamFrame) error
  15. closeForShutdown(error)
  16. getWindowUpdate() protocol.ByteCount
  17. }
  18. type receiveStream struct {
  19. mutex sync.Mutex
  20. streamID protocol.StreamID
  21. sender streamSender
  22. frameQueue *frameSorter
  23. readOffset protocol.ByteCount
  24. currentFrame []byte
  25. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  26. readPosInFrame int
  27. closeForShutdownErr error
  28. cancelReadErr error
  29. resetRemotelyErr StreamError
  30. closedForShutdown bool // set when CloseForShutdown() is called
  31. finRead bool // set once we read a frame with a FinBit
  32. canceledRead bool // set when CancelRead() is called
  33. resetRemotely bool // set when HandleRstStreamFrame() is called
  34. readChan chan struct{}
  35. readDeadline time.Time
  36. flowController flowcontrol.StreamFlowController
  37. version protocol.VersionNumber
  38. }
  39. var _ ReceiveStream = &receiveStream{}
  40. var _ receiveStreamI = &receiveStream{}
  41. func newReceiveStream(
  42. streamID protocol.StreamID,
  43. sender streamSender,
  44. flowController flowcontrol.StreamFlowController,
  45. version protocol.VersionNumber,
  46. ) *receiveStream {
  47. return &receiveStream{
  48. streamID: streamID,
  49. sender: sender,
  50. flowController: flowController,
  51. frameQueue: newFrameSorter(),
  52. readChan: make(chan struct{}, 1),
  53. version: version,
  54. }
  55. }
  56. func (s *receiveStream) StreamID() protocol.StreamID {
  57. return s.streamID
  58. }
  59. // Read implements io.Reader. It is not thread safe!
  60. func (s *receiveStream) Read(p []byte) (int, error) {
  61. completed, n, err := s.readImpl(p)
  62. if completed {
  63. s.sender.onStreamCompleted(s.streamID)
  64. }
  65. return n, err
  66. }
  67. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  68. s.mutex.Lock()
  69. defer s.mutex.Unlock()
  70. if s.finRead {
  71. return false, 0, io.EOF
  72. }
  73. if s.canceledRead {
  74. return false, 0, s.cancelReadErr
  75. }
  76. if s.resetRemotely {
  77. return false, 0, s.resetRemotelyErr
  78. }
  79. if s.closedForShutdown {
  80. return false, 0, s.closeForShutdownErr
  81. }
  82. bytesRead := 0
  83. for bytesRead < len(p) {
  84. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  85. s.dequeueNextFrame()
  86. }
  87. if s.currentFrame == nil && bytesRead > 0 {
  88. return false, bytesRead, s.closeForShutdownErr
  89. }
  90. for {
  91. // Stop waiting on errors
  92. if s.closedForShutdown {
  93. return false, bytesRead, s.closeForShutdownErr
  94. }
  95. if s.canceledRead {
  96. return false, bytesRead, s.cancelReadErr
  97. }
  98. if s.resetRemotely {
  99. return false, bytesRead, s.resetRemotelyErr
  100. }
  101. deadline := s.readDeadline
  102. if !deadline.IsZero() && !time.Now().Before(deadline) {
  103. return false, bytesRead, errDeadline
  104. }
  105. if s.currentFrame != nil || s.currentFrameIsLast {
  106. break
  107. }
  108. s.mutex.Unlock()
  109. if deadline.IsZero() {
  110. <-s.readChan
  111. } else {
  112. select {
  113. case <-s.readChan:
  114. case <-time.After(time.Until(deadline)):
  115. }
  116. }
  117. s.mutex.Lock()
  118. if s.currentFrame == nil {
  119. s.dequeueNextFrame()
  120. }
  121. }
  122. if bytesRead > len(p) {
  123. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  124. }
  125. if s.readPosInFrame > len(s.currentFrame) {
  126. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  127. }
  128. s.mutex.Unlock()
  129. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  130. s.readPosInFrame += m
  131. bytesRead += m
  132. s.readOffset += protocol.ByteCount(m)
  133. s.mutex.Lock()
  134. // when a RST_STREAM was received, the was already informed about the final byteOffset for this stream
  135. if !s.resetRemotely {
  136. s.flowController.AddBytesRead(protocol.ByteCount(m))
  137. }
  138. // increase the flow control window, if necessary
  139. s.flowController.MaybeQueueWindowUpdate()
  140. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  141. s.finRead = true
  142. return true, bytesRead, io.EOF
  143. }
  144. }
  145. return false, bytesRead, nil
  146. }
  147. func (s *receiveStream) dequeueNextFrame() {
  148. s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
  149. s.readPosInFrame = 0
  150. }
  151. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
  152. s.mutex.Lock()
  153. defer s.mutex.Unlock()
  154. if s.finRead {
  155. return nil
  156. }
  157. if s.canceledRead {
  158. return nil
  159. }
  160. s.canceledRead = true
  161. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  162. s.signalRead()
  163. if s.version.UsesIETFFrameFormat() {
  164. s.sender.queueControlFrame(&wire.StopSendingFrame{
  165. StreamID: s.streamID,
  166. ErrorCode: errorCode,
  167. })
  168. }
  169. return nil
  170. }
  171. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  172. maxOffset := frame.Offset + frame.DataLen()
  173. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  174. return err
  175. }
  176. s.mutex.Lock()
  177. defer s.mutex.Unlock()
  178. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
  179. return err
  180. }
  181. s.signalRead()
  182. return nil
  183. }
  184. func (s *receiveStream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
  185. completed, err := s.handleRstStreamFrameImpl(frame)
  186. if completed {
  187. s.sender.onStreamCompleted(s.streamID)
  188. }
  189. return err
  190. }
  191. func (s *receiveStream) handleRstStreamFrameImpl(frame *wire.RstStreamFrame) (bool /*completed */, error) {
  192. s.mutex.Lock()
  193. defer s.mutex.Unlock()
  194. if s.closedForShutdown {
  195. return false, nil
  196. }
  197. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  198. return false, err
  199. }
  200. // In gQUIC, error code 0 has a special meaning.
  201. // The peer will reliably continue transmitting, but is not interested in reading from the stream.
  202. // We should therefore just continue reading from the stream, until we encounter the FIN bit.
  203. if !s.version.UsesIETFFrameFormat() && frame.ErrorCode == 0 {
  204. return false, nil
  205. }
  206. // ignore duplicate RST_STREAM frames for this stream (after checking their final offset)
  207. if s.resetRemotely {
  208. return false, nil
  209. }
  210. s.resetRemotely = true
  211. s.resetRemotelyErr = streamCanceledError{
  212. errorCode: frame.ErrorCode,
  213. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  214. }
  215. s.signalRead()
  216. return true, nil
  217. }
  218. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  219. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  220. }
  221. func (s *receiveStream) onClose(offset protocol.ByteCount) {
  222. if s.canceledRead && !s.version.UsesIETFFrameFormat() {
  223. s.sender.queueControlFrame(&wire.RstStreamFrame{
  224. StreamID: s.streamID,
  225. ByteOffset: offset,
  226. ErrorCode: 0,
  227. })
  228. }
  229. }
  230. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  231. s.mutex.Lock()
  232. oldDeadline := s.readDeadline
  233. s.readDeadline = t
  234. s.mutex.Unlock()
  235. // if the new deadline is before the currently set deadline, wake up Read()
  236. if t.Before(oldDeadline) {
  237. s.signalRead()
  238. }
  239. return nil
  240. }
  241. // CloseForShutdown closes a stream abruptly.
  242. // It makes Read unblock (and return the error) immediately.
  243. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  244. func (s *receiveStream) closeForShutdown(err error) {
  245. s.mutex.Lock()
  246. s.closedForShutdown = true
  247. s.closeForShutdownErr = err
  248. s.mutex.Unlock()
  249. s.signalRead()
  250. }
  251. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  252. return s.flowController.GetWindowUpdate()
  253. }
  254. // signalRead performs a non-blocking send on the readChan
  255. func (s *receiveStream) signalRead() {
  256. select {
  257. case s.readChan <- struct{}{}:
  258. default:
  259. }
  260. }