| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package quic
- import (
- "sync"
- "github.com/lucas-clemente/quic-go/internal/flowcontrol"
- "github.com/lucas-clemente/quic-go/internal/protocol"
- "github.com/lucas-clemente/quic-go/internal/wire"
- )
- type windowUpdateQueue struct {
- mutex sync.Mutex
- queue map[protocol.StreamID]bool // used as a set
- queuedConn bool // connection-level window update
- cryptoStream cryptoStream
- streamGetter streamGetter
- connFlowController flowcontrol.ConnectionFlowController
- callback func(wire.Frame)
- }
- func newWindowUpdateQueue(
- streamGetter streamGetter,
- cryptoStream cryptoStream,
- connFC flowcontrol.ConnectionFlowController,
- cb func(wire.Frame),
- ) *windowUpdateQueue {
- return &windowUpdateQueue{
- queue: make(map[protocol.StreamID]bool),
- streamGetter: streamGetter,
- cryptoStream: cryptoStream,
- connFlowController: connFC,
- callback: cb,
- }
- }
- func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
- q.mutex.Lock()
- q.queue[id] = true
- q.mutex.Unlock()
- }
- func (q *windowUpdateQueue) AddConnection() {
- q.mutex.Lock()
- q.queuedConn = true
- q.mutex.Unlock()
- }
- func (q *windowUpdateQueue) QueueAll() {
- q.mutex.Lock()
- // queue a connection-level window update
- if q.queuedConn {
- q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
- q.queuedConn = false
- }
- // queue all stream-level window updates
- var offset protocol.ByteCount
- for id := range q.queue {
- if id == q.cryptoStream.StreamID() {
- offset = q.cryptoStream.getWindowUpdate()
- } else {
- str, err := q.streamGetter.GetOrOpenReceiveStream(id)
- if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
- continue
- }
- offset = str.getWindowUpdate()
- }
- if offset == 0 { // can happen if we received a final offset, right after queueing the window update
- continue
- }
- q.callback(&wire.MaxStreamDataFrame{
- StreamID: id,
- ByteOffset: offset,
- })
- delete(q.queue, id)
- }
- q.mutex.Unlock()
- }
|