瀏覽代碼

receiving queue

v2ray 9 年之前
父節點
當前提交
a1f5839461
共有 4 個文件被更改,包括 145 次插入127 次删除
  1. 2 0
      transport/internet/kcp/config.go
  2. 10 42
      transport/internet/kcp/connection.go
  3. 42 85
      transport/internet/kcp/kcp.go
  4. 91 0
      transport/internet/kcp/receiving.go

+ 2 - 0
transport/internet/kcp/config.go

@@ -7,6 +7,7 @@ type Config struct {
 	DownlinkCapacity uint32
 	Congestion       bool
 	WriteBuffer      uint32
+	ReadBuffer       uint32
 }
 
 func (this *Config) Apply() {
@@ -37,6 +38,7 @@ func DefaultConfig() Config {
 		DownlinkCapacity: 20,
 		Congestion:       false,
 		WriteBuffer:      8 * 1024 * 1024,
+		ReadBuffer:       8 * 1024 * 1024,
 	}
 }
 

+ 10 - 42
transport/internet/kcp/connection.go

@@ -46,7 +46,6 @@ type Connection struct {
 	block           Authenticator
 	needUpdate      bool
 	local, remote   net.Addr
-	rd              time.Time // read deadline
 	wd              time.Time // write deadline
 	chReadEvent     chan struct{}
 	writer          io.WriteCloser
@@ -86,40 +85,10 @@ func (this *Connection) Elapsed() uint32 {
 
 // Read implements the Conn Read method.
 func (this *Connection) Read(b []byte) (int, error) {
-	if this == nil ||
-		this.kcp.state == StateReadyToClose ||
-		this.kcp.state == StateTerminating ||
-		this.kcp.state == StateTerminated {
+	if this == nil || this.kcp.state == StateTerminating || this.kcp.state == StateTerminated {
 		return 0, io.EOF
 	}
-
-	for {
-		this.RLock()
-		if this == nil ||
-			this.kcp.state == StateReadyToClose ||
-			this.kcp.state == StateTerminating ||
-			this.kcp.state == StateTerminated {
-			this.RUnlock()
-			return 0, io.EOF
-		}
-
-		if !this.rd.IsZero() && this.rd.Before(time.Now()) {
-			this.RUnlock()
-			return 0, errTimeout
-		}
-		this.RUnlock()
-
-		this.kcpAccess.Lock()
-		nBytes := this.kcp.Recv(b)
-		this.kcpAccess.Unlock()
-		if nBytes > 0 {
-			return nBytes, nil
-		}
-		select {
-		case <-this.chReadEvent:
-		case <-time.After(time.Second):
-		}
-	}
+	return this.kcp.rcv_queue.Read(b)
 }
 
 // Write implements the Conn Write method.
@@ -195,13 +164,12 @@ func (this *Connection) RemoteAddr() net.Addr {
 
 // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
 func (this *Connection) SetDeadline(t time.Time) error {
-	if this == nil || this.kcp.state != StateActive {
-		return errClosedConnection
+	if err := this.SetReadDeadline(t); err != nil {
+		return err
+	}
+	if err := this.SetWriteDeadline(t); err != nil {
+		return err
 	}
-	this.Lock()
-	defer this.Unlock()
-	this.rd = t
-	this.wd = t
 	return nil
 }
 
@@ -210,9 +178,9 @@ func (this *Connection) SetReadDeadline(t time.Time) error {
 	if this == nil || this.kcp.state != StateActive {
 		return errClosedConnection
 	}
-	this.Lock()
-	defer this.Unlock()
-	this.rd = t
+	this.kcpAccess.Lock()
+	defer this.kcpAccess.Unlock()
+	this.kcp.rcv_queue.SetReadDeadline(t)
 	return nil
 }
 

+ 42 - 85
transport/internet/kcp/kcp.go

@@ -12,17 +12,13 @@ import (
 )
 
 const (
-	IKCP_RTO_NDL     = 30  // no delay min rto
-	IKCP_RTO_MIN     = 100 // normal min rto
-	IKCP_RTO_DEF     = 200
-	IKCP_RTO_MAX     = 60000
-	IKCP_WND_SND     = 32
-	IKCP_WND_RCV     = 32
-	IKCP_MTU_DEF     = 1350
-	IKCP_ACK_FAST    = 3
-	IKCP_INTERVAL    = 100
-	IKCP_THRESH_INIT = 2
-	IKCP_THRESH_MIN  = 2
+	IKCP_RTO_NDL  = 30  // no delay min rto
+	IKCP_RTO_MIN  = 100 // normal min rto
+	IKCP_RTO_DEF  = 200
+	IKCP_RTO_MAX  = 60000
+	IKCP_WND_SND  = 32
+	IKCP_WND_RCV  = 32
+	IKCP_INTERVAL = 100
 )
 
 func _itimediff(later, earlier uint32) int32 {
@@ -50,15 +46,14 @@ type KCP struct {
 	receivingUpdated bool
 	lastPingTime     uint32
 
-	mtu, mss                          uint32
-	snd_una, snd_nxt, rcv_nxt         uint32
-	rx_rttvar, rx_srtt, rx_rto        uint32
-	snd_wnd, rcv_wnd, rmt_wnd, cwnd   uint32
-	current, interval, ts_flush, xmit uint32
-	updated                           bool
+	mtu, mss                        uint32
+	snd_una, snd_nxt, rcv_nxt       uint32
+	rx_rttvar, rx_srtt, rx_rto      uint32
+	snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
+	current, interval               uint32
 
 	snd_queue *SendingQueue
-	rcv_queue []*DataSegment
+	rcv_queue *ReceivingQueue
 	snd_buf   []*DataSegment
 	rcv_buf   *ReceivingWindow
 
@@ -82,15 +77,31 @@ func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSi
 	kcp.mss = kcp.mtu - DataSegmentOverhead
 	kcp.rx_rto = IKCP_RTO_DEF
 	kcp.interval = IKCP_INTERVAL
-	kcp.ts_flush = IKCP_INTERVAL
 	kcp.output = NewSegmentWriter(mtu, output)
 	kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
 	kcp.snd_queue = NewSendingQueue(sendingQueueSize)
+	kcp.rcv_queue = NewReceivingQueue()
 	kcp.acklist = new(ACKList)
 	kcp.cwnd = kcp.snd_wnd
 	return kcp
 }
 
+func (kcp *KCP) SetState(state State) {
+	kcp.state = state
+	kcp.stateBeginTime = kcp.current
+
+	switch state {
+	case StateReadyToClose:
+		kcp.rcv_queue.Close()
+	case StatePeerClosed:
+		kcp.ClearSendQueue()
+	case StateTerminating:
+		kcp.rcv_queue.Close()
+	case StateTerminated:
+		kcp.rcv_queue.Close()
+	}
+}
+
 func (kcp *KCP) HandleOption(opt SegmentOption) {
 	if (opt & SegmentOptionClose) == SegmentOptionClose {
 		kcp.OnPeerClosed()
@@ -99,52 +110,22 @@ func (kcp *KCP) HandleOption(opt SegmentOption) {
 
 func (kcp *KCP) OnPeerClosed() {
 	if kcp.state == StateReadyToClose {
-		kcp.state = StateTerminating
-		kcp.stateBeginTime = kcp.current
+		kcp.SetState(StateTerminating)
 	}
 	if kcp.state == StateActive {
-		kcp.ClearSendQueue()
-		kcp.state = StatePeerClosed
-		kcp.stateBeginTime = kcp.current
+		kcp.SetState(StatePeerClosed)
 	}
 }
 
 func (kcp *KCP) OnClose() {
 	if kcp.state == StateActive {
-		kcp.state = StateReadyToClose
-		kcp.stateBeginTime = kcp.current
+		kcp.SetState(StateReadyToClose)
 	}
 	if kcp.state == StatePeerClosed {
-		kcp.state = StateTerminating
-		kcp.stateBeginTime = kcp.current
+		kcp.SetState(StateTerminating)
 	}
 }
 
-// Recv is user/upper level recv: returns size, returns below zero for EAGAIN
-func (kcp *KCP) Recv(buffer []byte) (n int) {
-	if len(kcp.rcv_queue) == 0 {
-		return -1
-	}
-
-	// merge fragment
-	count := 0
-	for _, seg := range kcp.rcv_queue {
-		dataLen := seg.Data.Len()
-		if dataLen > len(buffer) {
-			break
-		}
-		copy(buffer, seg.Data.Value)
-		seg.Release()
-		buffer = buffer[dataLen:]
-		n += dataLen
-		count++
-	}
-	kcp.rcv_queue = kcp.rcv_queue[count:]
-
-	kcp.DumpReceivingBuf()
-	return
-}
-
 // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
 // @Private
 func (kcp *KCP) DumpReceivingBuf() {
@@ -153,7 +134,9 @@ func (kcp *KCP) DumpReceivingBuf() {
 		if seg == nil {
 			break
 		}
-		kcp.rcv_queue = append(kcp.rcv_queue, seg)
+		kcp.rcv_queue.Put(seg.Data)
+		seg.Data = nil
+
 		kcp.rcv_buf.Advance()
 		kcp.rcv_nxt++
 		kcp.receivingUpdated = true
@@ -335,11 +318,9 @@ func (kcp *KCP) Input(data []byte) int {
 				if kcp.state == StateActive ||
 					kcp.state == StateReadyToClose ||
 					kcp.state == StatePeerClosed {
-					kcp.state = StateTerminating
-					kcp.stateBeginTime = kcp.current
+					kcp.SetState(StateTerminating)
 				} else if kcp.state == StateTerminating {
-					kcp.state = StateTerminated
-					kcp.stateBeginTime = kcp.current
+					kcp.SetState(StateTerminated)
 				}
 			}
 			kcp.HandleReceivingNext(seg.ReceivinNext)
@@ -373,15 +354,13 @@ func (kcp *KCP) flush() {
 		kcp.output.Flush()
 
 		if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
-			kcp.state = StateTerminated
-			kcp.stateBeginTime = kcp.current
+			kcp.SetState(StateTerminated)
 		}
 		return
 	}
 
 	if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
-		kcp.state = StateTerminating
-		kcp.stateBeginTime = kcp.current
+		kcp.SetState(StateTerminating)
 	}
 
 	current := kcp.current
@@ -435,7 +414,6 @@ func (kcp *KCP) flush() {
 		} else if _itimediff(current, segment.timeout) >= 0 {
 			needsend = true
 			segment.transmit++
-			kcp.xmit++
 			segment.timeout = current + kcp.rx_rto
 			lost = true
 		} else if segment.ackSkipped >= resent {
@@ -497,29 +475,8 @@ func (kcp *KCP) flush() {
 // ikcp_check when to call it again (without ikcp_input/_send calling).
 // 'current' - current timestamp in millisec.
 func (kcp *KCP) Update(current uint32) {
-	var slap int32
-
 	kcp.current = current
-
-	if !kcp.updated {
-		kcp.updated = true
-		kcp.ts_flush = kcp.current
-	}
-
-	slap = _itimediff(kcp.current, kcp.ts_flush)
-
-	if slap >= 10000 || slap < -10000 {
-		kcp.ts_flush = kcp.current
-		slap = 0
-	}
-
-	if slap >= 0 {
-		kcp.ts_flush += kcp.interval
-		if _itimediff(kcp.current, kcp.ts_flush) >= 0 {
-			kcp.ts_flush = kcp.current + kcp.interval
-		}
-		kcp.flush()
-	}
+	kcp.flush()
 }
 
 // NoDelay options

+ 91 - 0
transport/internet/kcp/receiving.go

@@ -1,5 +1,13 @@
 package kcp
 
+import (
+	"io"
+	"sync"
+	"time"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+)
+
 type ReceivingWindow struct {
 	start uint32
 	size  uint32
@@ -49,6 +57,89 @@ func (this *ReceivingWindow) Advance() {
 	}
 }
 
+type ReceivingQueue struct {
+	sync.RWMutex
+	closed  bool
+	cache   *alloc.Buffer
+	queue   chan *alloc.Buffer
+	timeout time.Time
+}
+
+func NewReceivingQueue() *ReceivingQueue {
+	return &ReceivingQueue{
+		queue: make(chan *alloc.Buffer, effectiveConfig.ReadBuffer/effectiveConfig.Mtu),
+	}
+}
+
+func (this *ReceivingQueue) Read(buf []byte) (int, error) {
+	if this.cache.Len() > 0 {
+		nBytes, err := this.cache.Read(buf)
+		if this.cache.IsEmpty() {
+			this.cache.Release()
+			this.cache = nil
+		}
+		return nBytes, err
+	}
+
+	var totalBytes int
+
+L:
+	for totalBytes < len(buf) {
+		timeToSleep := time.Millisecond
+		select {
+		case payload, open := <-this.queue:
+			if !open {
+				return totalBytes, io.EOF
+			}
+			nBytes, err := payload.Read(buf)
+			totalBytes += nBytes
+			if err != nil {
+				return totalBytes, err
+			}
+			if !payload.IsEmpty() {
+				this.cache = payload
+			}
+			buf = buf[nBytes:]
+		case <-time.After(timeToSleep):
+			if totalBytes > 0 {
+				break L
+			}
+			this.RLock()
+			if !this.timeout.IsZero() && this.timeout.Before(time.Now()) {
+				this.RUnlock()
+				return totalBytes, errTimeout
+			}
+			this.RUnlock()
+			timeToSleep += 500 * time.Millisecond
+		}
+	}
+
+	return totalBytes, nil
+}
+
+func (this *ReceivingQueue) Put(payload *alloc.Buffer) {
+	this.queue <- payload
+}
+
+func (this *ReceivingQueue) SetReadDeadline(t time.Time) error {
+	this.Lock()
+	defer this.Unlock()
+
+	this.timeout = t
+	return nil
+}
+
+func (this *ReceivingQueue) Close() {
+	this.Lock()
+	defer this.Unlock()
+
+	if this.closed {
+		return
+	}
+	this.closed = true
+	close(this.queue)
+}
+
 type ACKList struct {
 	timestamps []uint32
 	numbers    []uint32