Ver Fonte

correctly apply cwnd

v2ray há 9 anos atrás
pai
commit
6ea8691a07
1 ficheiros alterados com 21 adições e 17 exclusões
  1. 21 17
      transport/internet/kcp/sending.go

+ 21 - 17
transport/internet/kcp/sending.go

@@ -16,13 +16,12 @@ type SendingWindow struct {
 	prev []uint32
 	next []uint32
 
-	inFlightSize      uint32
 	totalInFlightSize uint32
 	writer            SegmentWriter
 	onPacketLoss      func(uint32)
 }
 
-func NewSendingWindow(size uint32, inFlightSize uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow {
+func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow {
 	window := &SendingWindow{
 		start:        0,
 		cap:          size,
@@ -33,7 +32,6 @@ func NewSendingWindow(size uint32, inFlightSize uint32, writer SegmentWriter, on
 		next:         make([]uint32, size),
 		writer:       writer,
 		onPacketLoss: onPacketLoss,
-		inFlightSize: inFlightSize,
 	}
 	return window
 }
@@ -42,6 +40,14 @@ func (this *SendingWindow) Len() int {
 	return int(this.len)
 }
 
+func (this *SendingWindow) Size() uint32 {
+	return this.cap
+}
+
+func (this *SendingWindow) IsFull() bool {
+	return this.len == this.cap
+}
+
 func (this *SendingWindow) Push(seg *DataSegment) {
 	pos := (this.start + this.len) % this.cap
 	this.data[pos] = seg
@@ -114,7 +120,7 @@ func (this *SendingWindow) HandleFastAck(number uint32) {
 	}
 }
 
-func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32) {
+func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxInFlightSize uint32) {
 	if this.Len() == 0 {
 		return
 	}
@@ -145,7 +151,7 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32) {
 		if needsend {
 			this.writer.Write(segment)
 			inFlightSize++
-			if inFlightSize >= this.inFlightSize {
+			if inFlightSize >= maxInFlightSize {
 				break
 			}
 		}
@@ -224,7 +230,6 @@ type SendingWorker struct {
 	kcp                 *KCP
 	window              *SendingWindow
 	queue               *SendingQueue
-	windowSize          uint32
 	firstUnacknowledged uint32
 	nextNumber          uint32
 	remoteNextNumber    uint32
@@ -239,10 +244,9 @@ func NewSendingWorker(kcp *KCP) *SendingWorker {
 		queue:            NewSendingQueue(effectiveConfig.GetSendingQueueSize()),
 		fastResend:       2,
 		remoteNextNumber: 32,
-		windowSize:       effectiveConfig.GetSendingWindowSize(),
-		controlWindow:    effectiveConfig.GetSendingWindowSize(),
+		controlWindow:    effectiveConfig.GetSendingInFlightSize(),
 	}
-	worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), effectiveConfig.GetSendingInFlightSize(), worker, worker.OnPacketLoss)
+	worker.window = NewSendingWindow(effectiveConfig.GetSendingWindowSize(), worker, worker.OnPacketLoss)
 	return worker
 }
 
@@ -268,7 +272,7 @@ func (this *SendingWorker) FindFirstUnacknowledged() {
 }
 
 func (this *SendingWorker) ProcessAck(number uint32) {
-	if number-this.firstUnacknowledged > this.windowSize {
+	if number-this.firstUnacknowledged > this.window.Size() {
 		return
 	}
 
@@ -350,11 +354,11 @@ func (this *SendingWorker) OnPacketLoss(lossRate uint32) {
 	} else if lossRate <= 5 {
 		this.controlWindow += this.controlWindow / 4
 	}
-	if this.controlWindow < 4 {
-		this.controlWindow = 4
+	if this.controlWindow < 16 {
+		this.controlWindow = 16
 	}
-	if this.controlWindow > 2*this.windowSize {
-		this.controlWindow = 2 * this.windowSize
+	if this.controlWindow > 2*effectiveConfig.GetSendingInFlightSize() {
+		this.controlWindow = 2 * effectiveConfig.GetSendingInFlightSize()
 	}
 }
 
@@ -362,7 +366,7 @@ func (this *SendingWorker) Flush() {
 	this.Lock()
 	defer this.Unlock()
 
-	cwnd := this.firstUnacknowledged + this.windowSize
+	cwnd := this.firstUnacknowledged + effectiveConfig.GetSendingInFlightSize()
 	if cwnd > this.remoteNextNumber {
 		cwnd = this.remoteNextNumber
 	}
@@ -370,7 +374,7 @@ func (this *SendingWorker) Flush() {
 		cwnd = this.firstUnacknowledged + this.controlWindow
 	}
 
-	for !this.queue.IsEmpty() && _itimediff(this.nextNumber, cwnd) < 0 {
+	for !this.queue.IsEmpty() && !this.window.IsFull() {
 		seg := this.queue.Pop()
 		seg.Number = this.nextNumber
 		seg.timeout = this.kcp.current
@@ -380,7 +384,7 @@ func (this *SendingWorker) Flush() {
 		this.nextNumber++
 	}
 
-	this.window.Flush(this.kcp.current, this.kcp.fastresend, this.kcp.rx_rto)
+	this.window.Flush(this.kcp.current, this.kcp.fastresend, this.kcp.rx_rto, cwnd)
 }
 
 func (this *SendingWorker) CloseWrite() {