framer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package quic
  2. import (
  3. "sync"
  4. "github.com/lucas-clemente/quic-go/internal/protocol"
  5. "github.com/lucas-clemente/quic-go/internal/wire"
  6. )
  7. type framer interface {
  8. QueueControlFrame(wire.Frame)
  9. AppendControlFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
  10. AddActiveStream(protocol.StreamID)
  11. AppendStreamFrames([]wire.Frame, protocol.ByteCount) []wire.Frame
  12. }
  13. type framerI struct {
  14. mutex sync.Mutex
  15. streamGetter streamGetter
  16. version protocol.VersionNumber
  17. activeStreams map[protocol.StreamID]struct{}
  18. streamQueue []protocol.StreamID
  19. controlFrameMutex sync.Mutex
  20. controlFrames []wire.Frame
  21. }
  22. var _ framer = &framerI{}
  23. func newFramer(
  24. streamGetter streamGetter,
  25. v protocol.VersionNumber,
  26. ) framer {
  27. return &framerI{
  28. streamGetter: streamGetter,
  29. activeStreams: make(map[protocol.StreamID]struct{}),
  30. version: v,
  31. }
  32. }
  33. func (f *framerI) QueueControlFrame(frame wire.Frame) {
  34. f.controlFrameMutex.Lock()
  35. f.controlFrames = append(f.controlFrames, frame)
  36. f.controlFrameMutex.Unlock()
  37. }
  38. func (f *framerI) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
  39. var length protocol.ByteCount
  40. f.controlFrameMutex.Lock()
  41. for len(f.controlFrames) > 0 {
  42. frame := f.controlFrames[len(f.controlFrames)-1]
  43. frameLen := frame.Length(f.version)
  44. if length+frameLen > maxLen {
  45. break
  46. }
  47. frames = append(frames, frame)
  48. length += frameLen
  49. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  50. }
  51. f.controlFrameMutex.Unlock()
  52. return frames, length
  53. }
  54. func (f *framerI) AddActiveStream(id protocol.StreamID) {
  55. f.mutex.Lock()
  56. if _, ok := f.activeStreams[id]; !ok {
  57. f.streamQueue = append(f.streamQueue, id)
  58. f.activeStreams[id] = struct{}{}
  59. }
  60. f.mutex.Unlock()
  61. }
  62. func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) []wire.Frame {
  63. var length protocol.ByteCount
  64. f.mutex.Lock()
  65. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  66. numActiveStreams := len(f.streamQueue)
  67. for i := 0; i < numActiveStreams; i++ {
  68. if maxLen-length < protocol.MinStreamFrameSize {
  69. break
  70. }
  71. id := f.streamQueue[0]
  72. f.streamQueue = f.streamQueue[1:]
  73. // This should never return an error. Better check it anyway.
  74. // The stream will only be in the streamQueue, if it enqueued itself there.
  75. str, err := f.streamGetter.GetOrOpenSendStream(id)
  76. // The stream can be nil if it completed after it said it had data.
  77. if str == nil || err != nil {
  78. delete(f.activeStreams, id)
  79. continue
  80. }
  81. frame, hasMoreData := str.popStreamFrame(maxLen - length)
  82. if hasMoreData { // put the stream back in the queue (at the end)
  83. f.streamQueue = append(f.streamQueue, id)
  84. } else { // no more data to send. Stream is not active any more
  85. delete(f.activeStreams, id)
  86. }
  87. if frame == nil { // can happen if the receiveStream was canceled after it said it had data
  88. continue
  89. }
  90. frames = append(frames, frame)
  91. length += frame.Length(f.version)
  92. }
  93. f.mutex.Unlock()
  94. return frames
  95. }