| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 | package kcpimport (	"sync"	"github.com/v2ray/v2ray-core/common/alloc")type SendingWindow struct {	start uint32	cap   uint32	len   uint32	last  uint32	data []*DataSegment	prev []uint32	next []uint32	totalInFlightSize uint32	writer            SegmentWriter	onPacketLoss      func(uint32)}func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow {	window := &SendingWindow{		start:        0,		cap:          size,		len:          0,		last:         0,		data:         make([]*DataSegment, size),		prev:         make([]uint32, size),		next:         make([]uint32, size),		writer:       writer,		onPacketLoss: onPacketLoss,	}	return window}func (this *SendingWindow) Len() int {	return int(this.len)}func (this *SendingWindow) IsEmpty() bool {	return this.len == 0}func (this *SendingWindow) Size() uint32 {	return this.cap}func (this *SendingWindow) IsFull() bool {	return this.len == this.cap}func (this *SendingWindow) Push(seg *DataSegment) {	pos := (this.start + this.len) % this.cap	this.data[pos] = seg	if this.len > 0 {		this.next[this.last] = pos		this.prev[pos] = this.last	}	this.last = pos	this.len++}func (this *SendingWindow) First() *DataSegment {	return this.data[this.start]}func (this *SendingWindow) Clear(una uint32) {	for !this.IsEmpty() && this.data[this.start].Number < una {		this.Remove(0)	}}func (this *SendingWindow) Remove(idx uint32) {	if this.len == 0 {		return	}	pos := (this.start + idx) % this.cap	seg := this.data[pos]	if seg == nil {		return	}	this.totalInFlightSize--	seg.Release()	this.data[pos] = nil	if pos == this.start && pos == this.last {		this.len = 0		this.start = 0		this.last = 0	} else if pos == this.start {		delta := this.next[pos] - this.start		if this.next[pos] < this.start {			delta = this.next[pos] + this.cap - this.start		}		this.start = this.next[pos]		this.len -= delta	} else if pos == this.last {		this.last = this.prev[pos]	} else {		this.next[this.prev[pos]] = this.next[pos]		this.prev[this.next[pos]] = this.prev[pos]	}}func (this *SendingWindow) HandleFastAck(number uint32) {	if this.len == 0 {		return	}	for i := this.start; ; i = this.next[i] {		seg := this.data[i]		if number-seg.Number > 0x7FFFFFFF {			break		}		if number != seg.Number {			seg.ackSkipped++		}		if i == this.last {			break		}	}}func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxInFlightSize uint32) {	if this.IsEmpty() {		return	}	var lost uint32	var inFlightSize uint32	for i := this.start; ; i = this.next[i] {		segment := this.data[i]		needsend := false		if segment.transmit == 0 {			needsend = true			segment.transmit++			segment.timeout = current + rto			this.totalInFlightSize++		} else if current-segment.timeout < 0x7FFFFFFF {			needsend = true			segment.transmit++			segment.timeout = current + rto			lost++		} else if segment.ackSkipped >= resend {			needsend = true			segment.transmit++			segment.ackSkipped = 0			segment.timeout = current + rto		}		if needsend {			segment.Timestamp = current			this.writer.Write(segment)			inFlightSize++			if inFlightSize >= maxInFlightSize {				break			}		}		if i == this.last {			break		}	}	if inFlightSize > 0 && this.totalInFlightSize != 0 {		rate := lost * 100 / this.totalInFlightSize		this.onPacketLoss(rate)	}}type SendingQueue struct {	start uint32	cap   uint32	len   uint32	list  []*alloc.Buffer}func NewSendingQueue(size uint32) *SendingQueue {	return &SendingQueue{		start: 0,		cap:   size,		list:  make([]*alloc.Buffer, size),		len:   0,	}}func (this *SendingQueue) IsFull() bool {	return this.len == this.cap}func (this *SendingQueue) IsEmpty() bool {	return this.len == 0}func (this *SendingQueue) Pop() *alloc.Buffer {	if this.IsEmpty() {		return nil	}	seg := this.list[this.start]	this.list[this.start] = nil	this.len--	this.start++	if this.start == this.cap {		this.start = 0	}	if this.IsEmpty() {		this.start = 0	}	return seg}func (this *SendingQueue) Push(seg *alloc.Buffer) {	if this.IsFull() {		return	}	this.list[(this.start+this.len)%this.cap] = seg	this.len++}func (this *SendingQueue) Clear() {	for i := uint32(0); i < this.len; i++ {		this.list[(i+this.start)%this.cap].Release()		this.list[(i+this.start)%this.cap] = nil	}	this.start = 0	this.len = 0}func (this *SendingQueue) Len() uint32 {	return this.len}type SendingWorker struct {	sync.RWMutex	conn                *Connection	window              *SendingWindow	queue               *SendingQueue	firstUnacknowledged uint32	nextNumber          uint32	remoteNextNumber    uint32	controlWindow       uint32	fastResend          uint32	updated             bool}func NewSendingWorker(kcp *Connection) *SendingWorker {	worker := &SendingWorker{		conn:             kcp,		queue:            NewSendingQueue(effectiveConfig.GetSendingQueueSize()),		fastResend:       2,		remoteNextNumber: 32,		controlWindow:    effectiveConfig.GetSendingInFlightSize(),	}	worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), worker, worker.OnPacketLoss)	return worker}func (this *SendingWorker) ProcessReceivingNext(nextNumber uint32) {	this.Lock()	defer this.Unlock()	this.ProcessReceivingNextWithoutLock(nextNumber)}func (this *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) {	this.window.Clear(nextNumber)	this.FindFirstUnacknowledged()}// @Privatefunc (this *SendingWorker) FindFirstUnacknowledged() {	prevUna := this.firstUnacknowledged	if !this.window.IsEmpty() {		this.firstUnacknowledged = this.window.First().Number	} else {		this.firstUnacknowledged = this.nextNumber	}	if this.firstUnacknowledged != prevUna {		this.updated = true	}}// @Privatefunc (this *SendingWorker) ProcessAck(number uint32) {	// number < this.firstUnacknowledged || number >= this.nextNumber	if number-this.firstUnacknowledged > 0x7FFFFFFF || number-this.nextNumber < 0x7FFFFFFF {		return	}	this.window.Remove(number - this.firstUnacknowledged)	this.FindFirstUnacknowledged()}func (this *SendingWorker) FillWindow(current uint32) {	for !this.queue.IsEmpty() && !this.window.IsFull() {		seg := NewDataSegment()		seg.Data = this.queue.Pop()		seg.Number = this.nextNumber		seg.timeout = current		seg.ackSkipped = 0		seg.transmit = 0		this.window.Push(seg)		this.nextNumber++	}}func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {	defer seg.Release()	this.Lock()	defer this.Unlock()	if this.remoteNextNumber < seg.ReceivingWindow {		this.remoteNextNumber = seg.ReceivingWindow	}	this.ProcessReceivingNextWithoutLock(seg.ReceivingNext)	if current-seg.Timestamp < 10000 {		this.conn.roundTrip.Update(current-seg.Timestamp, current)	}	var maxack uint32	for i := 0; i < int(seg.Count); i++ {		number := seg.NumberList[i]		this.ProcessAck(number)		if maxack < number {			maxack = number		}	}	this.window.HandleFastAck(maxack)	this.FillWindow(current)}func (this *SendingWorker) Push(b []byte) int {	nBytes := 0	this.Lock()	defer this.Unlock()	for len(b) > 0 && !this.queue.IsFull() {		var size int		if len(b) > int(this.conn.mss) {			size = int(this.conn.mss)		} else {			size = len(b)		}		this.queue.Push(AllocateBuffer().Clear().Append(b[:size]))		b = b[size:]		nBytes += size	}	return nBytes}// @Privatefunc (this *SendingWorker) Write(seg Segment) {	dataSeg := seg.(*DataSegment)	dataSeg.Conv = this.conn.conv	dataSeg.SendingNext = this.firstUnacknowledged	dataSeg.Option = 0	if this.conn.State() == StateReadyToClose {		dataSeg.Option = SegmentOptionClose	}	this.conn.output.Write(dataSeg)	this.updated = false}func (this *SendingWorker) PingNecessary() bool {	this.RLock()	defer this.RUnlock()	return this.updated}func (this *SendingWorker) MarkPingNecessary(b bool) {	this.Lock()	defer this.Unlock()	this.updated = b}func (this *SendingWorker) OnPacketLoss(lossRate uint32) {	if !effectiveConfig.Congestion || this.conn.roundTrip.Timeout() == 0 {		return	}	if lossRate >= 15 {		this.controlWindow = 3 * this.controlWindow / 4	} else if lossRate <= 5 {		this.controlWindow += this.controlWindow / 4	}	if this.controlWindow < 16 {		this.controlWindow = 16	}	if this.controlWindow > 2*effectiveConfig.GetSendingInFlightSize() {		this.controlWindow = 2 * effectiveConfig.GetSendingInFlightSize()	}}func (this *SendingWorker) Flush(current uint32) {	this.Lock()	defer this.Unlock()	cwnd := this.firstUnacknowledged + effectiveConfig.GetSendingInFlightSize()	if cwnd > this.remoteNextNumber {		cwnd = this.remoteNextNumber	}	if effectiveConfig.Congestion && cwnd > this.firstUnacknowledged+this.controlWindow {		cwnd = this.firstUnacknowledged + this.controlWindow	}	this.FillWindow(current)	this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd)}func (this *SendingWorker) CloseWrite() {	this.Lock()	defer this.Unlock()	this.window.Clear(0xFFFFFFFF)	this.queue.Clear()}func (this *SendingWorker) IsEmpty() bool {	this.RLock()	defer this.RUnlock()	return this.window.IsEmpty() && this.queue.IsEmpty()}
 |