stream.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package quic
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  7. "github.com/lucas-clemente/quic-go/internal/protocol"
  8. "github.com/lucas-clemente/quic-go/internal/wire"
  9. )
  10. const (
  11. errorCodeStopping protocol.ApplicationErrorCode = 0
  12. errorCodeStoppingGQUIC protocol.ApplicationErrorCode = 7
  13. )
  14. // The streamSender is notified by the stream about various events.
  15. type streamSender interface {
  16. queueControlFrame(wire.Frame)
  17. onHasStreamData(protocol.StreamID)
  18. // must be called without holding the mutex that is acquired by closeForShutdown
  19. onStreamCompleted(protocol.StreamID)
  20. }
  21. // Each of the both stream halves gets its own uniStreamSender.
  22. // This is necessary in order to keep track when both halves have been completed.
  23. type uniStreamSender struct {
  24. streamSender
  25. onStreamCompletedImpl func()
  26. }
  27. func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
  28. s.streamSender.queueControlFrame(f)
  29. }
  30. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
  31. s.streamSender.onHasStreamData(id)
  32. }
  33. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
  34. s.onStreamCompletedImpl()
  35. }
  36. var _ streamSender = &uniStreamSender{}
  37. type streamI interface {
  38. Stream
  39. closeForShutdown(error)
  40. // for receiving
  41. handleStreamFrame(*wire.StreamFrame) error
  42. handleRstStreamFrame(*wire.RstStreamFrame) error
  43. getWindowUpdate() protocol.ByteCount
  44. // for sending
  45. handleStopSendingFrame(*wire.StopSendingFrame)
  46. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  47. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  48. }
  49. var _ receiveStreamI = (streamI)(nil)
  50. var _ sendStreamI = (streamI)(nil)
  51. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  52. //
  53. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  54. type stream struct {
  55. receiveStream
  56. sendStream
  57. completedMutex sync.Mutex
  58. sender streamSender
  59. receiveStreamCompleted bool
  60. sendStreamCompleted bool
  61. version protocol.VersionNumber
  62. }
  63. var _ Stream = &stream{}
  64. type deadlineError struct{}
  65. func (deadlineError) Error() string { return "deadline exceeded" }
  66. func (deadlineError) Temporary() bool { return true }
  67. func (deadlineError) Timeout() bool { return true }
  68. var errDeadline net.Error = &deadlineError{}
  69. type streamCanceledError struct {
  70. error
  71. errorCode protocol.ApplicationErrorCode
  72. }
  73. func (streamCanceledError) Canceled() bool { return true }
  74. func (e streamCanceledError) ErrorCode() protocol.ApplicationErrorCode { return e.errorCode }
  75. var _ StreamError = &streamCanceledError{}
  76. // newStream creates a new Stream
  77. func newStream(streamID protocol.StreamID,
  78. sender streamSender,
  79. flowController flowcontrol.StreamFlowController,
  80. version protocol.VersionNumber,
  81. ) *stream {
  82. s := &stream{sender: sender, version: version}
  83. senderForSendStream := &uniStreamSender{
  84. streamSender: sender,
  85. onStreamCompletedImpl: func() {
  86. s.completedMutex.Lock()
  87. s.sendStreamCompleted = true
  88. s.checkIfCompleted()
  89. s.completedMutex.Unlock()
  90. },
  91. }
  92. s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
  93. senderForReceiveStream := &uniStreamSender{
  94. streamSender: sender,
  95. onStreamCompletedImpl: func() {
  96. s.completedMutex.Lock()
  97. s.receiveStreamCompleted = true
  98. s.checkIfCompleted()
  99. s.completedMutex.Unlock()
  100. },
  101. }
  102. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version)
  103. return s
  104. }
  105. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  106. func (s *stream) StreamID() protocol.StreamID {
  107. // the result is same for receiveStream and sendStream
  108. return s.sendStream.StreamID()
  109. }
  110. func (s *stream) Close() error {
  111. if err := s.sendStream.Close(); err != nil {
  112. return err
  113. }
  114. // in gQUIC, we need to send a RST_STREAM with the final offset if CancelRead() was called
  115. s.receiveStream.onClose(s.sendStream.getWriteOffset())
  116. return nil
  117. }
  118. func (s *stream) SetDeadline(t time.Time) error {
  119. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  120. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  121. return nil
  122. }
  123. // CloseForShutdown closes a stream abruptly.
  124. // It makes Read and Write unblock (and return the error) immediately.
  125. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  126. func (s *stream) closeForShutdown(err error) {
  127. s.sendStream.closeForShutdown(err)
  128. s.receiveStream.closeForShutdown(err)
  129. }
  130. func (s *stream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
  131. if err := s.receiveStream.handleRstStreamFrame(frame); err != nil {
  132. return err
  133. }
  134. if !s.version.UsesIETFFrameFormat() {
  135. s.handleStopSendingFrame(&wire.StopSendingFrame{
  136. StreamID: s.StreamID(),
  137. ErrorCode: frame.ErrorCode,
  138. })
  139. }
  140. return nil
  141. }
  142. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  143. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  144. func (s *stream) checkIfCompleted() {
  145. if s.sendStreamCompleted && s.receiveStreamCompleted {
  146. s.sender.onStreamCompleted(s.StreamID())
  147. }
  148. }