| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 | package kcpimport (	"sync"	"v2ray.com/core/common/alloc")type ReceivingWindow struct {	start uint32	size  uint32	list  []*DataSegment}func NewReceivingWindow(size uint32) *ReceivingWindow {	return &ReceivingWindow{		start: 0,		size:  size,		list:  make([]*DataSegment, size),	}}func (this *ReceivingWindow) Size() uint32 {	return this.size}func (this *ReceivingWindow) Position(idx uint32) uint32 {	return (idx + this.start) % this.size}func (this *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {	pos := this.Position(idx)	if this.list[pos] != nil {		return false	}	this.list[pos] = value	return true}func (this *ReceivingWindow) Remove(idx uint32) *DataSegment {	pos := this.Position(idx)	e := this.list[pos]	this.list[pos] = nil	return e}func (this *ReceivingWindow) RemoveFirst() *DataSegment {	return this.Remove(0)}func (this *ReceivingWindow) Advance() {	this.start++	if this.start == this.size {		this.start = 0	}}type AckList struct {	writer     SegmentWriter	timestamps []uint32	numbers    []uint32	nextFlush  []uint32}func NewAckList(writer SegmentWriter) *AckList {	return &AckList{		writer:     writer,		timestamps: make([]uint32, 0, 32),		numbers:    make([]uint32, 0, 32),		nextFlush:  make([]uint32, 0, 32),	}}func (this *AckList) Add(number uint32, timestamp uint32) {	this.timestamps = append(this.timestamps, timestamp)	this.numbers = append(this.numbers, number)	this.nextFlush = append(this.nextFlush, 0)}func (this *AckList) Clear(una uint32) {	count := 0	for i := 0; i < len(this.numbers); i++ {		if this.numbers[i] >= una {			if i != count {				this.numbers[count] = this.numbers[i]				this.timestamps[count] = this.timestamps[i]				this.nextFlush[count] = this.nextFlush[i]			}			count++		}	}	if count < len(this.numbers) {		this.numbers = this.numbers[:count]		this.timestamps = this.timestamps[:count]		this.nextFlush = this.nextFlush[:count]	}}func (this *AckList) Flush(current uint32, rto uint32) {	seg := NewAckSegment()	for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ {		if this.nextFlush[i] <= current {			seg.PutNumber(this.numbers[i])			seg.PutTimestamp(this.timestamps[i])			this.nextFlush[i] = current + rto/2		}	}	if seg.Count > 0 {		this.writer.Write(seg)		seg.Release()	}}type ReceivingWorker struct {	sync.RWMutex	conn       *Connection	leftOver   *alloc.Buffer	window     *ReceivingWindow	acklist    *AckList	nextNumber uint32	windowSize uint32}func NewReceivingWorker(kcp *Connection) *ReceivingWorker {	worker := &ReceivingWorker{		conn:       kcp,		window:     NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),		windowSize: kcp.Config.GetReceivingInFlightSize(),	}	worker.acklist = NewAckList(worker)	return worker}func (this *ReceivingWorker) ProcessSendingNext(number uint32) {	this.Lock()	defer this.Unlock()	this.acklist.Clear(number)}func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {	this.Lock()	defer this.Unlock()	number := seg.Number	idx := number - this.nextNumber	if idx >= this.windowSize {		return	}	this.acklist.Clear(seg.SendingNext)	this.acklist.Add(number, seg.Timestamp)	if !this.window.Set(idx, seg) {		seg.Release()	}}func (this *ReceivingWorker) Read(b []byte) int {	this.Lock()	defer this.Unlock()	total := 0	if this.leftOver != nil {		nBytes := copy(b, this.leftOver.Value)		if nBytes < this.leftOver.Len() {			this.leftOver.SliceFrom(nBytes)			return nBytes		}		this.leftOver.Release()		this.leftOver = nil		total += nBytes	}	for total < len(b) {		seg := this.window.RemoveFirst()		if seg == nil {			break		}		this.window.Advance()		this.nextNumber++		nBytes := copy(b[total:], seg.Data.Value)		total += nBytes		if nBytes < seg.Data.Len() {			seg.Data.SliceFrom(nBytes)			this.leftOver = seg.Data			seg.Data = nil			seg.Release()			break		}		seg.Release()	}	return total}func (this *ReceivingWorker) Flush(current uint32) {	this.Lock()	defer this.Unlock()	this.acklist.Flush(current, this.conn.roundTrip.Timeout())}func (this *ReceivingWorker) Write(seg Segment) {	ackSeg := seg.(*AckSegment)	ackSeg.Conv = this.conn.conv	ackSeg.ReceivingNext = this.nextNumber	ackSeg.ReceivingWindow = this.nextNumber + this.windowSize	if this.conn.state == StateReadyToClose {		ackSeg.Option = SegmentOptionClose	}	this.conn.output.Write(ackSeg)}func (this *ReceivingWorker) CloseRead() {}func (this *ReceivingWorker) UpdateNecessary() bool {	return len(this.acklist.numbers) > 0}
 |