send_stream.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/utils"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type sendStreamI interface {
  13. SendStream
  14. handleStopSendingFrame(*wire.StopSendingFrame)
  15. hasData() bool
  16. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  17. closeForShutdown(error)
  18. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  19. }
  20. type sendStream struct {
  21. mutex sync.Mutex
  22. ctx context.Context
  23. ctxCancel context.CancelFunc
  24. streamID protocol.StreamID
  25. sender streamSender
  26. writeOffset protocol.ByteCount
  27. cancelWriteErr error
  28. closeForShutdownErr error
  29. closedForShutdown bool // set when CloseForShutdown() is called
  30. finishedWriting bool // set once Close() is called
  31. canceledWrite bool // set when CancelWrite() is called, or a STOP_SENDING frame is received
  32. finSent bool // set when a STREAM_FRAME with FIN bit has b
  33. dataForWriting []byte
  34. writeChan chan struct{}
  35. deadline time.Time
  36. deadlineTimer *time.Timer // initialized by SetReadDeadline()
  37. flowController flowcontrol.StreamFlowController
  38. version protocol.VersionNumber
  39. }
  40. var _ SendStream = &sendStream{}
  41. var _ sendStreamI = &sendStream{}
  42. func newSendStream(
  43. streamID protocol.StreamID,
  44. sender streamSender,
  45. flowController flowcontrol.StreamFlowController,
  46. version protocol.VersionNumber,
  47. ) *sendStream {
  48. s := &sendStream{
  49. streamID: streamID,
  50. sender: sender,
  51. flowController: flowController,
  52. writeChan: make(chan struct{}, 1),
  53. version: version,
  54. }
  55. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  56. return s
  57. }
  58. func (s *sendStream) StreamID() protocol.StreamID {
  59. return s.streamID // same for receiveStream and sendStream
  60. }
  61. func (s *sendStream) Write(p []byte) (int, error) {
  62. s.mutex.Lock()
  63. defer s.mutex.Unlock()
  64. if s.finishedWriting {
  65. return 0, fmt.Errorf("write on closed stream %d", s.streamID)
  66. }
  67. if s.canceledWrite {
  68. return 0, s.cancelWriteErr
  69. }
  70. if s.closeForShutdownErr != nil {
  71. return 0, s.closeForShutdownErr
  72. }
  73. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  74. return 0, errDeadline
  75. }
  76. if len(p) == 0 {
  77. return 0, nil
  78. }
  79. s.dataForWriting = make([]byte, len(p))
  80. copy(s.dataForWriting, p)
  81. go s.sender.onHasStreamData(s.streamID)
  82. var bytesWritten int
  83. var err error
  84. for {
  85. bytesWritten = len(p) - len(s.dataForWriting)
  86. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  87. s.dataForWriting = nil
  88. err = errDeadline
  89. break
  90. }
  91. if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown {
  92. break
  93. }
  94. s.mutex.Unlock()
  95. if s.deadline.IsZero() {
  96. <-s.writeChan
  97. } else {
  98. select {
  99. case <-s.writeChan:
  100. case <-s.deadlineTimer.C:
  101. }
  102. }
  103. s.mutex.Lock()
  104. }
  105. if s.closeForShutdownErr != nil {
  106. err = s.closeForShutdownErr
  107. } else if s.cancelWriteErr != nil {
  108. err = s.cancelWriteErr
  109. }
  110. return bytesWritten, err
  111. }
  112. // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
  113. // maxBytes is the maximum length this frame (including frame header) will have.
  114. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
  115. completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
  116. if completed {
  117. s.sender.onStreamCompleted(s.streamID)
  118. }
  119. return frame, hasMoreData
  120. }
  121. func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
  122. s.mutex.Lock()
  123. defer s.mutex.Unlock()
  124. if s.closeForShutdownErr != nil {
  125. return false, nil, false
  126. }
  127. frame := &wire.StreamFrame{
  128. StreamID: s.streamID,
  129. Offset: s.writeOffset,
  130. DataLenPresent: true,
  131. }
  132. maxDataLen := frame.MaxDataLen(maxBytes, s.version)
  133. if maxDataLen == 0 { // a STREAM frame must have at least one byte of data
  134. return false, nil, s.dataForWriting != nil
  135. }
  136. frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen)
  137. if len(frame.Data) == 0 && !frame.FinBit {
  138. // this can happen if:
  139. // - popStreamFrame is called but there's no data for writing
  140. // - there's data for writing, but the stream is stream-level flow control blocked
  141. // - there's data for writing, but the stream is connection-level flow control blocked
  142. if s.dataForWriting == nil {
  143. return false, nil, false
  144. }
  145. if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
  146. s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{
  147. StreamID: s.streamID,
  148. DataLimit: offset,
  149. })
  150. return false, nil, false
  151. }
  152. return false, nil, true
  153. }
  154. if frame.FinBit {
  155. s.finSent = true
  156. }
  157. return frame.FinBit, frame, s.dataForWriting != nil
  158. }
  159. func (s *sendStream) hasData() bool {
  160. s.mutex.Lock()
  161. hasData := len(s.dataForWriting) > 0
  162. s.mutex.Unlock()
  163. return hasData
  164. }
  165. func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) {
  166. if s.dataForWriting == nil {
  167. return nil, s.finishedWriting && !s.finSent
  168. }
  169. maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
  170. if maxBytes == 0 {
  171. return nil, false
  172. }
  173. var ret []byte
  174. if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
  175. ret = s.dataForWriting[:maxBytes]
  176. s.dataForWriting = s.dataForWriting[maxBytes:]
  177. } else {
  178. ret = s.dataForWriting
  179. s.dataForWriting = nil
  180. s.signalWrite()
  181. }
  182. s.writeOffset += protocol.ByteCount(len(ret))
  183. s.flowController.AddBytesSent(protocol.ByteCount(len(ret)))
  184. return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent
  185. }
  186. func (s *sendStream) Close() error {
  187. s.mutex.Lock()
  188. defer s.mutex.Unlock()
  189. if s.canceledWrite {
  190. return fmt.Errorf("Close called for canceled stream %d", s.streamID)
  191. }
  192. s.finishedWriting = true
  193. go s.sender.onHasStreamData(s.streamID) // need to send the FIN
  194. s.ctxCancel()
  195. return nil
  196. }
  197. func (s *sendStream) CancelWrite(errorCode protocol.ApplicationErrorCode) error {
  198. s.mutex.Lock()
  199. completed, err := s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode))
  200. s.mutex.Unlock()
  201. if completed {
  202. s.sender.onStreamCompleted(s.streamID)
  203. }
  204. return err
  205. }
  206. // must be called after locking the mutex
  207. func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) (bool /*completed */, error) {
  208. if s.canceledWrite {
  209. return false, nil
  210. }
  211. if s.finishedWriting {
  212. return false, fmt.Errorf("CancelWrite for closed stream %d", s.streamID)
  213. }
  214. s.canceledWrite = true
  215. s.cancelWriteErr = writeErr
  216. s.signalWrite()
  217. s.sender.queueControlFrame(&wire.ResetStreamFrame{
  218. StreamID: s.streamID,
  219. ByteOffset: s.writeOffset,
  220. ErrorCode: errorCode,
  221. })
  222. // TODO(#991): cancel retransmissions for this stream
  223. s.ctxCancel()
  224. return true, nil
  225. }
  226. func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
  227. if completed := s.handleStopSendingFrameImpl(frame); completed {
  228. s.sender.onStreamCompleted(s.streamID)
  229. }
  230. }
  231. func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
  232. s.flowController.UpdateSendWindow(frame.ByteOffset)
  233. s.mutex.Lock()
  234. hasData := false
  235. if s.dataForWriting != nil {
  236. hasData = true
  237. }
  238. s.mutex.Unlock()
  239. if hasData {
  240. s.sender.onHasStreamData(s.streamID)
  241. }
  242. }
  243. // must be called after locking the mutex
  244. func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
  245. s.mutex.Lock()
  246. defer s.mutex.Unlock()
  247. writeErr := streamCanceledError{
  248. errorCode: frame.ErrorCode,
  249. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  250. }
  251. errorCode := errorCodeStopping
  252. completed, _ := s.cancelWriteImpl(errorCode, writeErr)
  253. return completed
  254. }
  255. func (s *sendStream) Context() context.Context {
  256. return s.ctx
  257. }
  258. func (s *sendStream) SetWriteDeadline(t time.Time) error {
  259. s.mutex.Lock()
  260. defer s.mutex.Unlock()
  261. s.deadline = t
  262. if s.deadline.IsZero() { // skip if there's no deadline to set
  263. s.signalWrite()
  264. return nil
  265. }
  266. // Lazily initialize the deadline timer.
  267. if s.deadlineTimer == nil {
  268. s.deadlineTimer = time.NewTimer(time.Until(t))
  269. return nil
  270. }
  271. // reset the timer to the new deadline
  272. if !s.deadlineTimer.Stop() {
  273. <-s.deadlineTimer.C
  274. }
  275. s.deadlineTimer.Reset(time.Until(t))
  276. return nil
  277. }
  278. // CloseForShutdown closes a stream abruptly.
  279. // It makes Write unblock (and return the error) immediately.
  280. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  281. func (s *sendStream) closeForShutdown(err error) {
  282. s.mutex.Lock()
  283. s.closedForShutdown = true
  284. s.closeForShutdownErr = err
  285. s.mutex.Unlock()
  286. s.signalWrite()
  287. s.ctxCancel()
  288. }
  289. func (s *sendStream) getWriteOffset() protocol.ByteCount {
  290. return s.writeOffset
  291. }
  292. // signalWrite performs a non-blocking send on the writeChan
  293. func (s *sendStream) signalWrite() {
  294. select {
  295. case s.writeChan <- struct{}{}:
  296. default:
  297. }
  298. }