| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package quic
- import (
- "sync"
- "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/protocol"
- "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/wire"
- )
- type framer interface {
- QueueControlFrame(wire.Frame)
- AppendControlFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
- AddActiveStream(protocol.StreamID)
- AppendStreamFrames([]wire.Frame, protocol.ByteCount) []wire.Frame
- }
- type framerI struct {
- mutex sync.Mutex
- streamGetter streamGetter
- version protocol.VersionNumber
- activeStreams map[protocol.StreamID]struct{}
- streamQueue []protocol.StreamID
- controlFrameMutex sync.Mutex
- controlFrames []wire.Frame
- }
- var _ framer = &framerI{}
- func newFramer(
- streamGetter streamGetter,
- v protocol.VersionNumber,
- ) framer {
- return &framerI{
- streamGetter: streamGetter,
- activeStreams: make(map[protocol.StreamID]struct{}),
- version: v,
- }
- }
- func (f *framerI) QueueControlFrame(frame wire.Frame) {
- f.controlFrameMutex.Lock()
- f.controlFrames = append(f.controlFrames, frame)
- f.controlFrameMutex.Unlock()
- }
- func (f *framerI) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
- var length protocol.ByteCount
- f.controlFrameMutex.Lock()
- for len(f.controlFrames) > 0 {
- frame := f.controlFrames[len(f.controlFrames)-1]
- frameLen := frame.Length(f.version)
- if length+frameLen > maxLen {
- break
- }
- frames = append(frames, frame)
- length += frameLen
- f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
- }
- f.controlFrameMutex.Unlock()
- return frames, length
- }
- func (f *framerI) AddActiveStream(id protocol.StreamID) {
- f.mutex.Lock()
- if _, ok := f.activeStreams[id]; !ok {
- f.streamQueue = append(f.streamQueue, id)
- f.activeStreams[id] = struct{}{}
- }
- f.mutex.Unlock()
- }
- func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) []wire.Frame {
- var length protocol.ByteCount
- f.mutex.Lock()
- // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
- numActiveStreams := len(f.streamQueue)
- for i := 0; i < numActiveStreams; i++ {
- if maxLen-length < protocol.MinStreamFrameSize {
- break
- }
- id := f.streamQueue[0]
- f.streamQueue = f.streamQueue[1:]
- // This should never return an error. Better check it anyway.
- // The stream will only be in the streamQueue, if it enqueued itself there.
- str, err := f.streamGetter.GetOrOpenSendStream(id)
- // The stream can be nil if it completed after it said it had data.
- if str == nil || err != nil {
- delete(f.activeStreams, id)
- continue
- }
- frame, hasMoreData := str.popStreamFrame(maxLen - length)
- if hasMoreData { // put the stream back in the queue (at the end)
- f.streamQueue = append(f.streamQueue, id)
- } else { // no more data to send. Stream is not active any more
- delete(f.activeStreams, id)
- }
- if frame == nil { // can happen if the receiveStream was canceled after it said it had data
- continue
- }
- frames = append(frames, frame)
- length += frame.Length(f.version)
- }
- f.mutex.Unlock()
- return frames
- }
|