window_update_queue.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package quic
  2. import (
  3. "sync"
  4. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  5. "github.com/lucas-clemente/quic-go/internal/protocol"
  6. "github.com/lucas-clemente/quic-go/internal/wire"
  7. )
  8. type windowUpdateQueue struct {
  9. mutex sync.Mutex
  10. queue map[protocol.StreamID]bool // used as a set
  11. queuedConn bool // connection-level window update
  12. cryptoStream cryptoStream
  13. streamGetter streamGetter
  14. connFlowController flowcontrol.ConnectionFlowController
  15. callback func(wire.Frame)
  16. }
  17. func newWindowUpdateQueue(
  18. streamGetter streamGetter,
  19. cryptoStream cryptoStream,
  20. connFC flowcontrol.ConnectionFlowController,
  21. cb func(wire.Frame),
  22. ) *windowUpdateQueue {
  23. return &windowUpdateQueue{
  24. queue: make(map[protocol.StreamID]bool),
  25. streamGetter: streamGetter,
  26. cryptoStream: cryptoStream,
  27. connFlowController: connFC,
  28. callback: cb,
  29. }
  30. }
  31. func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
  32. q.mutex.Lock()
  33. q.queue[id] = true
  34. q.mutex.Unlock()
  35. }
  36. func (q *windowUpdateQueue) AddConnection() {
  37. q.mutex.Lock()
  38. q.queuedConn = true
  39. q.mutex.Unlock()
  40. }
  41. func (q *windowUpdateQueue) QueueAll() {
  42. q.mutex.Lock()
  43. // queue a connection-level window update
  44. if q.queuedConn {
  45. q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
  46. q.queuedConn = false
  47. }
  48. // queue all stream-level window updates
  49. var offset protocol.ByteCount
  50. for id := range q.queue {
  51. if id == q.cryptoStream.StreamID() {
  52. offset = q.cryptoStream.getWindowUpdate()
  53. } else {
  54. str, err := q.streamGetter.GetOrOpenReceiveStream(id)
  55. if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
  56. continue
  57. }
  58. offset = str.getWindowUpdate()
  59. }
  60. if offset == 0 { // can happen if we received a final offset, right after queueing the window update
  61. continue
  62. }
  63. q.callback(&wire.MaxStreamDataFrame{
  64. StreamID: id,
  65. ByteOffset: offset,
  66. })
  67. delete(q.queue, id)
  68. }
  69. q.mutex.Unlock()
  70. }