소스 검색

apply sync.Pool to segments

v2ray 9 년 전
부모
커밋
13e83c17a5
4개의 변경된 파일58개의 추가작업 그리고 12개의 파일을 삭제
  1. 5 6
      transport/internet/kcp/kcp.go
  2. 1 1
      transport/internet/kcp/receiving.go
  3. 50 2
      transport/internet/kcp/segment.go
  4. 2 3
      transport/internet/kcp/sending.go

+ 5 - 6
transport/internet/kcp/kcp.go

@@ -204,12 +204,11 @@ func (kcp *KCP) flush() {
 	kcp.sendingWorker.Flush()
 
 	if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
-		seg := &CmdOnlySegment{
-			Conv:         kcp.conv,
-			Cmd:          SegmentCommandPing,
-			ReceivinNext: kcp.receivingWorker.nextNumber,
-			SendingNext:  kcp.sendingWorker.firstUnacknowledged,
-		}
+		seg := NewCmdOnlySegment()
+		seg.Conv = kcp.conv
+		seg.Cmd = SegmentCommandPing
+		seg.ReceivinNext = kcp.receivingWorker.nextNumber
+		seg.SendingNext = kcp.sendingWorker.firstUnacknowledged
 		if kcp.state == StateReadyToClose {
 			seg.Opt = SegmentOptionClose
 		}

+ 1 - 1
transport/internet/kcp/receiving.go

@@ -200,7 +200,7 @@ func (this *AckList) Clear(una uint32) {
 }
 
 func (this *AckList) Flush(current uint32, rto uint32) {
-	seg := new(AckSegment)
+	seg := NewAckSegment()
 	this.Lock()
 	for i := 0; i < len(this.numbers); i++ {
 		if this.nextFlush[i] <= current {

+ 50 - 2
transport/internet/kcp/segment.go

@@ -1,12 +1,26 @@
 package kcp
 
 import (
+	"sync"
+
 	"github.com/v2ray/v2ray-core/common"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	_ "github.com/v2ray/v2ray-core/common/log"
 	"github.com/v2ray/v2ray-core/common/serial"
 )
 
+var (
+	dataSegmentPool = &sync.Pool{
+		New: func() interface{} { return new(DataSegment) },
+	}
+	ackSegmentPool = &sync.Pool{
+		New: func() interface{} { return new(AckSegment) },
+	}
+	cmdSegmentPool = &sync.Pool{
+		New: func() interface{} { return new(CmdOnlySegment) },
+	}
+)
+
 type SegmentCommand byte
 
 const (
@@ -45,6 +59,10 @@ type DataSegment struct {
 	transmit   uint32
 }
 
+func NewDataSegment() *DataSegment {
+	return dataSegmentPool.Get().(*DataSegment)
+}
+
 func (this *DataSegment) Bytes(b []byte) []byte {
 	b = serial.Uint16ToBytes(this.Conv, b)
 	b = append(b, byte(SegmentCommandData), byte(this.Opt))
@@ -62,6 +80,12 @@ func (this *DataSegment) ByteSize() int {
 
 func (this *DataSegment) Release() {
 	this.Data.Release()
+	this.Data = nil
+	this.Opt = 0
+	this.timeout = 0
+	this.ackSkipped = 0
+	this.transmit = 0
+	dataSegmentPool.Put(this)
 }
 
 type AckSegment struct {
@@ -74,6 +98,17 @@ type AckSegment struct {
 	TimestampList   []uint32
 }
 
+func NewAckSegment() *AckSegment {
+	seg := ackSegmentPool.Get().(*AckSegment)
+	if seg.NumberList == nil {
+		seg.NumberList = make([]uint32, 0, 128)
+	}
+	if seg.TimestampList == nil {
+		seg.TimestampList = make([]uint32, 0, 128)
+	}
+	return seg
+}
+
 func (this *AckSegment) ByteSize() int {
 	return 2 + 1 + 1 + 4 + 4 + 1 + int(this.Count)*4 + int(this.Count)*4
 }
@@ -91,7 +126,13 @@ func (this *AckSegment) Bytes(b []byte) []byte {
 	return b
 }
 
-func (this *AckSegment) Release() {}
+func (this *AckSegment) Release() {
+	this.Opt = 0
+	this.Count = 0
+	this.NumberList = this.NumberList[:0]
+	this.TimestampList = this.TimestampList[:0]
+	ackSegmentPool.Put(this)
+}
 
 type CmdOnlySegment struct {
 	Conv         uint16
@@ -101,6 +142,10 @@ type CmdOnlySegment struct {
 	ReceivinNext uint32
 }
 
+func NewCmdOnlySegment() *CmdOnlySegment {
+	return cmdSegmentPool.Get().(*CmdOnlySegment)
+}
+
 func (this *CmdOnlySegment) ByteSize() int {
 	return 2 + 1 + 1 + 4 + 4
 }
@@ -113,7 +158,10 @@ func (this *CmdOnlySegment) Bytes(b []byte) []byte {
 	return b
 }
 
-func (this *CmdOnlySegment) Release() {}
+func (this *CmdOnlySegment) Release() {
+	this.Opt = 0
+	cmdSegmentPool.Put(this)
+}
 
 func ReadSegment(buf []byte) (Segment, []byte) {
 	if len(buf) <= 4 {

+ 2 - 3
transport/internet/kcp/sending.go

@@ -313,9 +313,8 @@ func (this *SendingWorker) Push(b []byte) int {
 		} else {
 			size = len(b)
 		}
-		seg := &DataSegment{
-			Data: alloc.NewSmallBuffer().Clear().Append(b[:size]),
-		}
+		seg := NewDataSegment()
+		seg.Data = alloc.NewSmallBuffer().Clear().Append(b[:size])
 		this.Lock()
 		this.queue.Push(seg)
 		this.Unlock()