send_stream.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/utils"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type sendStreamI interface {
  13. SendStream
  14. handleStopSendingFrame(*wire.StopSendingFrame)
  15. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  16. closeForShutdown(error)
  17. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  18. }
  19. type sendStream struct {
  20. mutex sync.Mutex
  21. ctx context.Context
  22. ctxCancel context.CancelFunc
  23. streamID protocol.StreamID
  24. sender streamSender
  25. writeOffset protocol.ByteCount
  26. cancelWriteErr error
  27. closeForShutdownErr error
  28. closedForShutdown bool // set when CloseForShutdown() is called
  29. finishedWriting bool // set once Close() is called
  30. canceledWrite bool // set when CancelWrite() is called, or a STOP_SENDING frame is received
  31. finSent bool // set when a STREAM_FRAME with FIN bit has b
  32. dataForWriting []byte
  33. writeChan chan struct{}
  34. writeDeadline time.Time
  35. flowController flowcontrol.StreamFlowController
  36. version protocol.VersionNumber
  37. }
  38. var _ SendStream = &sendStream{}
  39. var _ sendStreamI = &sendStream{}
  40. func newSendStream(
  41. streamID protocol.StreamID,
  42. sender streamSender,
  43. flowController flowcontrol.StreamFlowController,
  44. version protocol.VersionNumber,
  45. ) *sendStream {
  46. s := &sendStream{
  47. streamID: streamID,
  48. sender: sender,
  49. flowController: flowController,
  50. writeChan: make(chan struct{}, 1),
  51. version: version,
  52. }
  53. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  54. return s
  55. }
  56. func (s *sendStream) StreamID() protocol.StreamID {
  57. return s.streamID // same for receiveStream and sendStream
  58. }
  59. func (s *sendStream) Write(p []byte) (int, error) {
  60. s.mutex.Lock()
  61. defer s.mutex.Unlock()
  62. if s.finishedWriting {
  63. return 0, fmt.Errorf("write on closed stream %d", s.streamID)
  64. }
  65. if s.canceledWrite {
  66. return 0, s.cancelWriteErr
  67. }
  68. if s.closeForShutdownErr != nil {
  69. return 0, s.closeForShutdownErr
  70. }
  71. if !s.writeDeadline.IsZero() && !time.Now().Before(s.writeDeadline) {
  72. return 0, errDeadline
  73. }
  74. if len(p) == 0 {
  75. return 0, nil
  76. }
  77. s.dataForWriting = make([]byte, len(p))
  78. copy(s.dataForWriting, p)
  79. s.sender.onHasStreamData(s.streamID)
  80. var bytesWritten int
  81. var err error
  82. for {
  83. bytesWritten = len(p) - len(s.dataForWriting)
  84. deadline := s.writeDeadline
  85. if !deadline.IsZero() && !time.Now().Before(deadline) {
  86. s.dataForWriting = nil
  87. err = errDeadline
  88. break
  89. }
  90. if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown {
  91. break
  92. }
  93. s.mutex.Unlock()
  94. if deadline.IsZero() {
  95. <-s.writeChan
  96. } else {
  97. select {
  98. case <-s.writeChan:
  99. case <-time.After(time.Until(deadline)):
  100. }
  101. }
  102. s.mutex.Lock()
  103. }
  104. if s.closeForShutdownErr != nil {
  105. err = s.closeForShutdownErr
  106. } else if s.cancelWriteErr != nil {
  107. err = s.cancelWriteErr
  108. }
  109. return bytesWritten, err
  110. }
  111. // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
  112. // maxBytes is the maximum length this frame (including frame header) will have.
  113. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
  114. completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
  115. if completed {
  116. s.sender.onStreamCompleted(s.streamID)
  117. }
  118. return frame, hasMoreData
  119. }
  120. func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
  121. s.mutex.Lock()
  122. defer s.mutex.Unlock()
  123. if s.closeForShutdownErr != nil {
  124. return false, nil, false
  125. }
  126. frame := &wire.StreamFrame{
  127. StreamID: s.streamID,
  128. Offset: s.writeOffset,
  129. DataLenPresent: true,
  130. }
  131. maxDataLen := frame.MaxDataLen(maxBytes, s.version)
  132. if maxDataLen == 0 { // a STREAM frame must have at least one byte of data
  133. return false, nil, s.dataForWriting != nil
  134. }
  135. frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen)
  136. if len(frame.Data) == 0 && !frame.FinBit {
  137. // this can happen if:
  138. // - popStreamFrame is called but there's no data for writing
  139. // - there's data for writing, but the stream is stream-level flow control blocked
  140. // - there's data for writing, but the stream is connection-level flow control blocked
  141. if s.dataForWriting == nil {
  142. return false, nil, false
  143. }
  144. if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
  145. s.sender.queueControlFrame(&wire.StreamBlockedFrame{
  146. StreamID: s.streamID,
  147. Offset: offset,
  148. })
  149. return false, nil, false
  150. }
  151. return false, nil, true
  152. }
  153. if frame.FinBit {
  154. s.finSent = true
  155. }
  156. return frame.FinBit, frame, s.dataForWriting != nil
  157. }
  158. func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) {
  159. if s.dataForWriting == nil {
  160. return nil, s.finishedWriting && !s.finSent
  161. }
  162. // TODO(#657): Flow control for the crypto stream
  163. if s.streamID != s.version.CryptoStreamID() {
  164. maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
  165. }
  166. if maxBytes == 0 {
  167. return nil, false
  168. }
  169. var ret []byte
  170. if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
  171. ret = s.dataForWriting[:maxBytes]
  172. s.dataForWriting = s.dataForWriting[maxBytes:]
  173. } else {
  174. ret = s.dataForWriting
  175. s.dataForWriting = nil
  176. s.signalWrite()
  177. }
  178. s.writeOffset += protocol.ByteCount(len(ret))
  179. s.flowController.AddBytesSent(protocol.ByteCount(len(ret)))
  180. return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent
  181. }
  182. func (s *sendStream) Close() error {
  183. s.mutex.Lock()
  184. defer s.mutex.Unlock()
  185. if s.canceledWrite {
  186. return fmt.Errorf("Close called for canceled stream %d", s.streamID)
  187. }
  188. s.finishedWriting = true
  189. s.sender.onHasStreamData(s.streamID) // need to send the FIN
  190. s.ctxCancel()
  191. return nil
  192. }
  193. func (s *sendStream) CancelWrite(errorCode protocol.ApplicationErrorCode) error {
  194. s.mutex.Lock()
  195. completed, err := s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode))
  196. s.mutex.Unlock()
  197. if completed {
  198. s.sender.onStreamCompleted(s.streamID)
  199. }
  200. return err
  201. }
  202. // must be called after locking the mutex
  203. func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) (bool /*completed */, error) {
  204. if s.canceledWrite {
  205. return false, nil
  206. }
  207. if s.finishedWriting {
  208. return false, fmt.Errorf("CancelWrite for closed stream %d", s.streamID)
  209. }
  210. s.canceledWrite = true
  211. s.cancelWriteErr = writeErr
  212. s.signalWrite()
  213. s.sender.queueControlFrame(&wire.RstStreamFrame{
  214. StreamID: s.streamID,
  215. ByteOffset: s.writeOffset,
  216. ErrorCode: errorCode,
  217. })
  218. // TODO(#991): cancel retransmissions for this stream
  219. s.ctxCancel()
  220. return true, nil
  221. }
  222. func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
  223. if completed := s.handleStopSendingFrameImpl(frame); completed {
  224. s.sender.onStreamCompleted(s.streamID)
  225. }
  226. }
  227. func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
  228. s.flowController.UpdateSendWindow(frame.ByteOffset)
  229. s.mutex.Lock()
  230. if s.dataForWriting != nil {
  231. s.sender.onHasStreamData(s.streamID)
  232. }
  233. s.mutex.Unlock()
  234. }
  235. // must be called after locking the mutex
  236. func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
  237. s.mutex.Lock()
  238. defer s.mutex.Unlock()
  239. writeErr := streamCanceledError{
  240. errorCode: frame.ErrorCode,
  241. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  242. }
  243. errorCode := errorCodeStopping
  244. if !s.version.UsesIETFFrameFormat() {
  245. errorCode = errorCodeStoppingGQUIC
  246. }
  247. completed, _ := s.cancelWriteImpl(errorCode, writeErr)
  248. return completed
  249. }
  250. func (s *sendStream) Context() context.Context {
  251. return s.ctx
  252. }
  253. func (s *sendStream) SetWriteDeadline(t time.Time) error {
  254. s.mutex.Lock()
  255. oldDeadline := s.writeDeadline
  256. s.writeDeadline = t
  257. s.mutex.Unlock()
  258. if t.Before(oldDeadline) {
  259. s.signalWrite()
  260. }
  261. return nil
  262. }
  263. // CloseForShutdown closes a stream abruptly.
  264. // It makes Write unblock (and return the error) immediately.
  265. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  266. func (s *sendStream) closeForShutdown(err error) {
  267. s.mutex.Lock()
  268. s.closedForShutdown = true
  269. s.closeForShutdownErr = err
  270. s.mutex.Unlock()
  271. s.signalWrite()
  272. s.ctxCancel()
  273. }
  274. func (s *sendStream) getWriteOffset() protocol.ByteCount {
  275. return s.writeOffset
  276. }
  277. // signalWrite performs a non-blocking send on the writeChan
  278. func (s *sendStream) signalWrite() {
  279. select {
  280. case s.writeChan <- struct{}{}:
  281. default:
  282. }
  283. }