Explorar o código

reduce ack packet size and send peer RTO

v2ray %!s(int64=9) %!d(string=hai) anos
pai
achega
00841583d2

+ 21 - 5
transport/internet/kcp/connection.go

@@ -50,13 +50,26 @@ func nowMillisec() int64 {
 
 type RountTripInfo struct {
 	sync.RWMutex
-	variation uint32
-	srtt      uint32
-	rto       uint32
-	minRtt    uint32
+	variation        uint32
+	srtt             uint32
+	rto              uint32
+	minRtt           uint32
+	updatedTimestamp uint32
 }
 
-func (this *RountTripInfo) Update(rtt uint32) {
+func (this *RountTripInfo) UpdatePeerRTO(rto uint32, current uint32) {
+	this.Lock()
+	defer this.Unlock()
+
+	if current-this.updatedTimestamp < 5000 {
+		return
+	}
+
+	this.updatedTimestamp = current
+	this.rto = rto
+}
+
+func (this *RountTripInfo) Update(rtt uint32, current uint32) {
 	if rtt > 0x7FFFFFFF {
 		return
 	}
@@ -89,6 +102,7 @@ func (this *RountTripInfo) Update(rtt uint32) {
 		rto = 10000
 	}
 	this.rto = rto * 3 / 2
+	this.updatedTimestamp = current
 }
 
 func (this *RountTripInfo) Timeout() uint32 {
@@ -449,6 +463,7 @@ func (this *Connection) Input(data []byte) int {
 			}
 			this.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
 			this.receivingWorker.ProcessSendingNext(seg.SendingNext)
+			this.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)
 			seg.Release()
 		default:
 		}
@@ -503,6 +518,7 @@ func (this *Connection) flush() {
 		seg.Command = CommandPing
 		seg.ReceivinNext = this.receivingWorker.nextNumber
 		seg.SendingNext = this.sendingWorker.firstUnacknowledged
+		seg.PeerRTO = this.roundTrip.Timeout()
 		if this.State() == StateReadyToClose {
 			seg.Option = SegmentOptionClose
 		}

+ 2 - 1
transport/internet/kcp/receiving.go

@@ -162,7 +162,8 @@ func (this *AckList) Flush(current uint32, rto uint32) {
 	seg := NewAckSegment()
 	for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ {
 		if this.nextFlush[i] <= current {
-			seg.PutNumber(this.numbers[i], this.timestamps[i])
+			seg.PutNumber(this.numbers[i])
+			seg.PutTimestamp(this.timestamps[i])
 			this.nextFlush[i] = current + rto/2
 		}
 	}

+ 24 - 12
transport/internet/kcp/segment.go

@@ -74,19 +74,24 @@ type AckSegment struct {
 	Option          SegmentOption
 	ReceivingWindow uint32
 	ReceivingNext   uint32
+	Timestamp       uint32
 	Count           byte
 	NumberList      []uint32
-	TimestampList   []uint32
 }
 
 func NewAckSegment() *AckSegment {
 	return new(AckSegment)
 }
 
-func (this *AckSegment) PutNumber(number uint32, timestamp uint32) {
+func (this *AckSegment) PutTimestamp(timestamp uint32) {
+	if timestamp-this.Timestamp < 0x7FFFFFFF {
+		this.Timestamp = timestamp
+	}
+}
+
+func (this *AckSegment) PutNumber(number uint32) {
 	this.Count++
 	this.NumberList = append(this.NumberList, number)
-	this.TimestampList = append(this.TimestampList, timestamp)
 }
 
 func (this *AckSegment) IsFull() bool {
@@ -94,7 +99,7 @@ func (this *AckSegment) IsFull() bool {
 }
 
 func (this *AckSegment) ByteSize() int {
-	return 2 + 1 + 1 + 4 + 4 + 1 + int(this.Count)*4 + int(this.Count)*4
+	return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int(this.Count)*4
 }
 
 func (this *AckSegment) Bytes(b []byte) []byte {
@@ -102,17 +107,16 @@ func (this *AckSegment) Bytes(b []byte) []byte {
 	b = append(b, byte(CommandACK), byte(this.Option))
 	b = serial.Uint32ToBytes(this.ReceivingWindow, b)
 	b = serial.Uint32ToBytes(this.ReceivingNext, b)
+	b = serial.Uint32ToBytes(this.Timestamp, b)
 	b = append(b, this.Count)
 	for i := byte(0); i < this.Count; i++ {
 		b = serial.Uint32ToBytes(this.NumberList[i], b)
-		b = serial.Uint32ToBytes(this.TimestampList[i], b)
 	}
 	return b
 }
 
 func (this *AckSegment) Release() {
 	this.NumberList = nil
-	this.TimestampList = nil
 }
 
 type CmdOnlySegment struct {
@@ -121,6 +125,7 @@ type CmdOnlySegment struct {
 	Option       SegmentOption
 	SendingNext  uint32
 	ReceivinNext uint32
+	PeerRTO      uint32
 }
 
 func NewCmdOnlySegment() *CmdOnlySegment {
@@ -128,7 +133,7 @@ func NewCmdOnlySegment() *CmdOnlySegment {
 }
 
 func (this *CmdOnlySegment) ByteSize() int {
-	return 2 + 1 + 1 + 4 + 4
+	return 2 + 1 + 1 + 4 + 4 + 4
 }
 
 func (this *CmdOnlySegment) Bytes(b []byte) []byte {
@@ -136,6 +141,7 @@ func (this *CmdOnlySegment) Bytes(b []byte) []byte {
 	b = append(b, byte(this.Command), byte(this.Option))
 	b = serial.Uint32ToBytes(this.SendingNext, b)
 	b = serial.Uint32ToBytes(this.ReceivinNext, b)
+	b = serial.Uint32ToBytes(this.PeerRTO, b)
 	return b
 }
 
@@ -186,7 +192,7 @@ func ReadSegment(buf []byte) (Segment, []byte) {
 		seg := NewAckSegment()
 		seg.Conv = conv
 		seg.Option = opt
-		if len(buf) < 9 {
+		if len(buf) < 13 {
 			return nil, nil
 		}
 
@@ -196,15 +202,18 @@ func ReadSegment(buf []byte) (Segment, []byte) {
 		seg.ReceivingNext = serial.BytesToUint32(buf)
 		buf = buf[4:]
 
+		seg.Timestamp = serial.BytesToUint32(buf)
+		buf = buf[4:]
+
 		count := int(buf[0])
 		buf = buf[1:]
 
-		if len(buf) < count*8 {
+		if len(buf) < count*4 {
 			return nil, nil
 		}
 		for i := 0; i < count; i++ {
-			seg.PutNumber(serial.BytesToUint32(buf), serial.BytesToUint32(buf[4:]))
-			buf = buf[8:]
+			seg.PutNumber(serial.BytesToUint32(buf))
+			buf = buf[4:]
 		}
 
 		return seg, buf
@@ -215,7 +224,7 @@ func ReadSegment(buf []byte) (Segment, []byte) {
 	seg.Command = cmd
 	seg.Option = opt
 
-	if len(buf) < 8 {
+	if len(buf) < 12 {
 		return nil, nil
 	}
 
@@ -225,5 +234,8 @@ func ReadSegment(buf []byte) (Segment, []byte) {
 	seg.ReceivinNext = serial.BytesToUint32(buf)
 	buf = buf[4:]
 
+	seg.PeerRTO = serial.BytesToUint32(buf)
+	buf = buf[4:]
+
 	return seg, buf
 }

+ 2 - 2
transport/internet/kcp/segment_test.go

@@ -48,9 +48,9 @@ func TestACKSegment(t *testing.T) {
 		Conv:            1,
 		ReceivingWindow: 2,
 		ReceivingNext:   3,
+		Timestamp:       10,
 		Count:           5,
 		NumberList:      []uint32{1, 3, 5, 7, 9},
-		TimestampList:   []uint32{2, 4, 6, 8, 10},
 	}
 
 	nBytes := seg.ByteSize()
@@ -64,8 +64,8 @@ func TestACKSegment(t *testing.T) {
 	assert.Uint32(seg2.ReceivingWindow).Equals(seg.ReceivingWindow)
 	assert.Uint32(seg2.ReceivingNext).Equals(seg.ReceivingNext)
 	assert.Byte(seg2.Count).Equals(seg.Count)
+	assert.Uint32(seg2.Timestamp).Equals(seg.Timestamp)
 	for i := byte(0); i < seg2.Count; i++ {
-		assert.Uint32(seg2.TimestampList[i]).Equals(seg.TimestampList[i])
 		assert.Uint32(seg2.NumberList[i]).Equals(seg.NumberList[i])
 	}
 }

+ 4 - 4
transport/internet/kcp/sending.go

@@ -317,14 +317,14 @@ func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {
 		this.remoteNextNumber = seg.ReceivingWindow
 	}
 	this.ProcessReceivingNextWithoutLock(seg.ReceivingNext)
+	if current-seg.Timestamp < 10000 {
+		this.conn.roundTrip.Update(current-seg.Timestamp, current)
+	}
 
 	var maxack uint32
 	for i := 0; i < int(seg.Count); i++ {
-		timestamp := seg.TimestampList[i]
 		number := seg.NumberList[i]
-		if current-timestamp < 10000 {
-			this.conn.roundTrip.Update(current - timestamp)
-		}
+
 		this.ProcessAck(number)
 		if maxack < number {
 			maxack = number