stream_framer.go 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package quic
  2. import (
  3. "sync"
  4. "github.com/lucas-clemente/quic-go/internal/protocol"
  5. "github.com/lucas-clemente/quic-go/internal/wire"
  6. )
  7. type streamFramer struct {
  8. streamGetter streamGetter
  9. cryptoStream cryptoStream
  10. version protocol.VersionNumber
  11. streamQueueMutex sync.Mutex
  12. activeStreams map[protocol.StreamID]struct{}
  13. streamQueue []protocol.StreamID
  14. hasCryptoStreamData bool
  15. }
  16. func newStreamFramer(
  17. cryptoStream cryptoStream,
  18. streamGetter streamGetter,
  19. v protocol.VersionNumber,
  20. ) *streamFramer {
  21. return &streamFramer{
  22. streamGetter: streamGetter,
  23. cryptoStream: cryptoStream,
  24. activeStreams: make(map[protocol.StreamID]struct{}),
  25. version: v,
  26. }
  27. }
  28. func (f *streamFramer) AddActiveStream(id protocol.StreamID) {
  29. if id == f.version.CryptoStreamID() { // the crypto stream is handled separately
  30. f.streamQueueMutex.Lock()
  31. f.hasCryptoStreamData = true
  32. f.streamQueueMutex.Unlock()
  33. return
  34. }
  35. f.streamQueueMutex.Lock()
  36. if _, ok := f.activeStreams[id]; !ok {
  37. f.streamQueue = append(f.streamQueue, id)
  38. f.activeStreams[id] = struct{}{}
  39. }
  40. f.streamQueueMutex.Unlock()
  41. }
  42. func (f *streamFramer) HasCryptoStreamData() bool {
  43. f.streamQueueMutex.Lock()
  44. hasCryptoStreamData := f.hasCryptoStreamData
  45. f.streamQueueMutex.Unlock()
  46. return hasCryptoStreamData
  47. }
  48. func (f *streamFramer) PopCryptoStreamFrame(maxLen protocol.ByteCount) *wire.StreamFrame {
  49. f.streamQueueMutex.Lock()
  50. frame, hasMoreData := f.cryptoStream.popStreamFrame(maxLen)
  51. f.hasCryptoStreamData = hasMoreData
  52. f.streamQueueMutex.Unlock()
  53. return frame
  54. }
  55. func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
  56. var currentLen protocol.ByteCount
  57. var frames []*wire.StreamFrame
  58. f.streamQueueMutex.Lock()
  59. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  60. numActiveStreams := len(f.streamQueue)
  61. for i := 0; i < numActiveStreams; i++ {
  62. if maxTotalLen-currentLen < protocol.MinStreamFrameSize {
  63. break
  64. }
  65. id := f.streamQueue[0]
  66. f.streamQueue = f.streamQueue[1:]
  67. // This should never return an error. Better check it anyway.
  68. // The stream will only be in the streamQueue, if it enqueued itself there.
  69. str, err := f.streamGetter.GetOrOpenSendStream(id)
  70. // The stream can be nil if it completed after it said it had data.
  71. if str == nil || err != nil {
  72. delete(f.activeStreams, id)
  73. continue
  74. }
  75. frame, hasMoreData := str.popStreamFrame(maxTotalLen - currentLen)
  76. if hasMoreData { // put the stream back in the queue (at the end)
  77. f.streamQueue = append(f.streamQueue, id)
  78. } else { // no more data to send. Stream is not active any more
  79. delete(f.activeStreams, id)
  80. }
  81. if frame == nil { // can happen if the receiveStream was canceled after it said it had data
  82. continue
  83. }
  84. frames = append(frames, frame)
  85. currentLen += frame.Length(f.version)
  86. }
  87. f.streamQueueMutex.Unlock()
  88. return frames
  89. }