| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 | package kcpimport (	"sync"	"v2ray.com/core/common"	"v2ray.com/core/common/buf")type ReceivingWindow struct {	cache map[uint32]*DataSegment}func NewReceivingWindow() *ReceivingWindow {	return &ReceivingWindow{		cache: make(map[uint32]*DataSegment),	}}func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {	_, f := w.cache[id]	if f {		return false	}	w.cache[id] = value	return true}func (w *ReceivingWindow) Has(id uint32) bool {	_, f := w.cache[id]	return f}func (w *ReceivingWindow) Remove(id uint32) *DataSegment {	v, f := w.cache[id]	if !f {		return nil	}	delete(w.cache, id)	return v}type AckList struct {	writer     SegmentWriter	timestamps []uint32	numbers    []uint32	nextFlush  []uint32	flushCandidates []uint32	dirty           bool}func NewAckList(writer SegmentWriter) *AckList {	return &AckList{		writer:          writer,		timestamps:      make([]uint32, 0, 128),		numbers:         make([]uint32, 0, 128),		nextFlush:       make([]uint32, 0, 128),		flushCandidates: make([]uint32, 0, 128),	}}func (l *AckList) Add(number uint32, timestamp uint32) {	l.timestamps = append(l.timestamps, timestamp)	l.numbers = append(l.numbers, number)	l.nextFlush = append(l.nextFlush, 0)	l.dirty = true}func (l *AckList) Clear(una uint32) {	count := 0	for i := 0; i < len(l.numbers); i++ {		if l.numbers[i] < una {			continue		}		if i != count {			l.numbers[count] = l.numbers[i]			l.timestamps[count] = l.timestamps[i]			l.nextFlush[count] = l.nextFlush[i]		}		count++	}	if count < len(l.numbers) {		l.numbers = l.numbers[:count]		l.timestamps = l.timestamps[:count]		l.nextFlush = l.nextFlush[:count]		l.dirty = true	}}func (l *AckList) Flush(current uint32, rto uint32) {	l.flushCandidates = l.flushCandidates[:0]	seg := NewAckSegment()	for i := 0; i < len(l.numbers); i++ {		if l.nextFlush[i] > current {			if len(l.flushCandidates) < cap(l.flushCandidates) {				l.flushCandidates = append(l.flushCandidates, l.numbers[i])			}			continue		}		seg.PutNumber(l.numbers[i])		seg.PutTimestamp(l.timestamps[i])		timeout := rto / 2		if timeout < 20 {			timeout = 20		}		l.nextFlush[i] = current + timeout		if seg.IsFull() {			l.writer.Write(seg)			seg.Release()			seg = NewAckSegment()			l.dirty = false		}	}	if l.dirty || !seg.IsEmpty() {		for _, number := range l.flushCandidates {			if seg.IsFull() {				break			}			seg.PutNumber(number)		}		l.writer.Write(seg)		l.dirty = false	}	seg.Release()}type ReceivingWorker struct {	sync.RWMutex	conn       *Connection	leftOver   buf.MultiBuffer	window     *ReceivingWindow	acklist    *AckList	nextNumber uint32	windowSize uint32}func NewReceivingWorker(kcp *Connection) *ReceivingWorker {	worker := &ReceivingWorker{		conn:       kcp,		window:     NewReceivingWindow(),		windowSize: kcp.Config.GetReceivingInFlightSize(),	}	worker.acklist = NewAckList(worker)	return worker}func (w *ReceivingWorker) Release() {	w.Lock()	w.leftOver.Release()	w.Unlock()}func (w *ReceivingWorker) ProcessSendingNext(number uint32) {	w.Lock()	defer w.Unlock()	w.acklist.Clear(number)}func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {	w.Lock()	defer w.Unlock()	number := seg.Number	idx := number - w.nextNumber	if idx >= w.windowSize {		return	}	w.acklist.Clear(seg.SendingNext)	w.acklist.Add(number, seg.Timestamp)	if !w.window.Set(seg.Number, seg) {		seg.Release()	}}func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {	if w.leftOver != nil {		mb := w.leftOver		w.leftOver = nil		return mb	}	mb := buf.NewMultiBufferCap(32)	w.Lock()	defer w.Unlock()	for {		seg := w.window.Remove(w.nextNumber)		if seg == nil {			break		}		w.nextNumber++		mb.Append(seg.Detach())		seg.Release()	}	return mb}func (w *ReceivingWorker) Read(b []byte) int {	mb := w.ReadMultiBuffer()	if mb.IsEmpty() {		return 0	}	nBytes, err := mb.Read(b)	common.Must(err)	if !mb.IsEmpty() {		w.leftOver = mb	}	return nBytes}func (w *ReceivingWorker) IsDataAvailable() bool {	w.RLock()	defer w.RUnlock()	return w.window.Has(w.nextNumber)}func (w *ReceivingWorker) NextNumber() uint32 {	w.RLock()	defer w.RUnlock()	return w.nextNumber}func (w *ReceivingWorker) Flush(current uint32) {	w.Lock()	defer w.Unlock()	w.acklist.Flush(current, w.conn.roundTrip.Timeout())}func (w *ReceivingWorker) Write(seg Segment) error {	ackSeg := seg.(*AckSegment)	ackSeg.Conv = w.conn.meta.Conversation	ackSeg.ReceivingNext = w.nextNumber	ackSeg.ReceivingWindow = w.nextNumber + w.windowSize	ackSeg.Option = 0	if w.conn.State() == StateReadyToClose {		ackSeg.Option = SegmentOptionClose	}	return w.conn.output.Write(ackSeg)}func (*ReceivingWorker) CloseRead() {}func (w *ReceivingWorker) UpdateNecessary() bool {	w.RLock()	defer w.RUnlock()	return len(w.acklist.numbers) > 0}
 |