Browse Source

bug fixes

v2ray 9 years ago
parent
commit
0047910a81
2 changed files with 35 additions and 47 deletions
  1. 31 47
      transport/internet/kcp/kcp.go
  2. 4 0
      transport/internet/kcp/segment.go

+ 31 - 47
transport/internet/kcp/kcp.go

@@ -63,9 +63,14 @@ const (
 
 
 // KCP defines a single KCP connection
 // KCP defines a single KCP connection
 type KCP struct {
 type KCP struct {
-	conv                                   uint16
-	state                                  State
-	stateBeginTime                         uint32
+	conv             uint16
+	state            State
+	stateBeginTime   uint32
+	lastIncomingTime uint32
+	sendingUpdated   bool
+	receivingUpdated bool
+	lastPingTime     uint32
+
 	mtu, mss                               uint32
 	mtu, mss                               uint32
 	snd_una, snd_nxt, rcv_nxt              uint32
 	snd_una, snd_nxt, rcv_nxt              uint32
 	ts_recent, ts_lastack, ssthresh        uint32
 	ts_recent, ts_lastack, ssthresh        uint32
@@ -83,7 +88,6 @@ type KCP struct {
 
 
 	acklist *ACKList
 	acklist *ACKList
 
 
-	buffer            []byte
 	fastresend        int32
 	fastresend        int32
 	congestionControl bool
 	congestionControl bool
 	output            *SegmentWriter
 	output            *SegmentWriter
@@ -92,13 +96,14 @@ type KCP struct {
 // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
 // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
 // from the same connection.
 // from the same connection.
 func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
 func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
+	log.Debug("KCP|Core: creating KCP ", conv)
 	kcp := new(KCP)
 	kcp := new(KCP)
 	kcp.conv = conv
 	kcp.conv = conv
 	kcp.snd_wnd = sendingWindowSize
 	kcp.snd_wnd = sendingWindowSize
 	kcp.rcv_wnd = receivingWindowSize
 	kcp.rcv_wnd = receivingWindowSize
 	kcp.rmt_wnd = IKCP_WND_RCV
 	kcp.rmt_wnd = IKCP_WND_RCV
 	kcp.mtu = mtu
 	kcp.mtu = mtu
-	kcp.mss = kcp.mtu - IKCP_OVERHEAD
+	kcp.mss = kcp.mtu - DataSegmentOverhead
 	kcp.rx_rto = IKCP_RTO_DEF
 	kcp.rx_rto = IKCP_RTO_DEF
 	kcp.interval = IKCP_INTERVAL
 	kcp.interval = IKCP_INTERVAL
 	kcp.ts_flush = IKCP_INTERVAL
 	kcp.ts_flush = IKCP_INTERVAL
@@ -121,13 +126,11 @@ func (kcp *KCP) OnPeerClosed() {
 	if kcp.state == StateReadyToClose {
 	if kcp.state == StateReadyToClose {
 		kcp.state = StateTerminating
 		kcp.state = StateTerminating
 		kcp.stateBeginTime = kcp.current
 		kcp.stateBeginTime = kcp.current
-		log.Info("KCP terminating at ", kcp.current)
 	}
 	}
 	if kcp.state == StateActive {
 	if kcp.state == StateActive {
 		kcp.ClearSendQueue()
 		kcp.ClearSendQueue()
 		kcp.state = StatePeerClosed
 		kcp.state = StatePeerClosed
 		kcp.stateBeginTime = kcp.current
 		kcp.stateBeginTime = kcp.current
-		log.Info("KCP peer close at ", kcp.current)
 	}
 	}
 }
 }
 
 
@@ -135,12 +138,10 @@ func (kcp *KCP) OnClose() {
 	if kcp.state == StateActive {
 	if kcp.state == StateActive {
 		kcp.state = StateReadyToClose
 		kcp.state = StateReadyToClose
 		kcp.stateBeginTime = kcp.current
 		kcp.stateBeginTime = kcp.current
-		log.Info("KCP ready close at ", kcp.current)
 	}
 	}
 	if kcp.state == StatePeerClosed {
 	if kcp.state == StatePeerClosed {
 		kcp.state = StateTerminating
 		kcp.state = StateTerminating
 		kcp.stateBeginTime = kcp.current
 		kcp.stateBeginTime = kcp.current
-		log.Info("KCP terminating at ", kcp.current)
 	}
 	}
 }
 }
 
 
@@ -228,12 +229,16 @@ func (kcp *KCP) update_ack(rtt int32) {
 }
 }
 
 
 func (kcp *KCP) shrink_buf() {
 func (kcp *KCP) shrink_buf() {
+	prevUna := kcp.snd_una
 	if len(kcp.snd_buf) > 0 {
 	if len(kcp.snd_buf) > 0 {
 		seg := kcp.snd_buf[0]
 		seg := kcp.snd_buf[0]
 		kcp.snd_una = seg.Number
 		kcp.snd_una = seg.Number
 	} else {
 	} else {
 		kcp.snd_una = kcp.snd_nxt
 		kcp.snd_una = kcp.snd_nxt
 	}
 	}
+	if kcp.snd_una != prevUna {
+		kcp.sendingUpdated = true
+	}
 }
 }
 
 
 func (kcp *KCP) parse_ack(sn uint32) {
 func (kcp *KCP) parse_ack(sn uint32) {
@@ -282,6 +287,7 @@ func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
 
 
 func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
 func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
 	kcp.acklist.Clear(sendingNext)
 	kcp.acklist.Clear(sendingNext)
+	kcp.receivingUpdated = true
 }
 }
 
 
 func (kcp *KCP) parse_data(newseg *DataSegment) {
 func (kcp *KCP) parse_data(newseg *DataSegment) {
@@ -301,7 +307,8 @@ func (kcp *KCP) parse_data(newseg *DataSegment) {
 
 
 // Input when you received a low level packet (eg. UDP packet), call it
 // Input when you received a low level packet (eg. UDP packet), call it
 func (kcp *KCP) Input(data []byte) int {
 func (kcp *KCP) Input(data []byte) int {
-	log.Info("KCP input at ", kcp.current)
+	kcp.lastIncomingTime = kcp.current
+
 	var seg ISegment
 	var seg ISegment
 	var maxack uint32
 	var maxack uint32
 	var flag int
 	var flag int
@@ -347,11 +354,9 @@ func (kcp *KCP) Input(data []byte) int {
 					kcp.state == StatePeerClosed {
 					kcp.state == StatePeerClosed {
 					kcp.state = StateTerminating
 					kcp.state = StateTerminating
 					kcp.stateBeginTime = kcp.current
 					kcp.stateBeginTime = kcp.current
-					log.Info("KCP terminating at ", kcp.current)
 				} else if kcp.state == StateTerminating {
 				} else if kcp.state == StateTerminating {
 					kcp.state = StateTerminated
 					kcp.state = StateTerminated
 					kcp.stateBeginTime = kcp.current
 					kcp.stateBeginTime = kcp.current
-					log.Info("KCP terminated at ", kcp.current)
 				}
 				}
 			}
 			}
 			kcp.HandleReceivingNext(seg.ReceivinNext)
 			kcp.HandleReceivingNext(seg.ReceivinNext)
@@ -381,7 +386,6 @@ func (kcp *KCP) flush() {
 
 
 		if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
 		if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
 			kcp.state = StateTerminated
 			kcp.state = StateTerminated
-			log.Info("KCP terminated at ", kcp.current)
 			kcp.stateBeginTime = kcp.current
 			kcp.stateBeginTime = kcp.current
 		}
 		}
 		return
 		return
@@ -389,12 +393,10 @@ func (kcp *KCP) flush() {
 
 
 	if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
 	if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
 		kcp.state = StateTerminating
 		kcp.state = StateTerminating
-		log.Info("KCP terminating at ", kcp.current)
 		kcp.stateBeginTime = kcp.current
 		kcp.stateBeginTime = kcp.current
 	}
 	}
 
 
 	current := kcp.current
 	current := kcp.current
-	segSent := false
 	//lost := false
 	//lost := false
 
 
 	//var seg Segment
 	//var seg Segment
@@ -410,7 +412,7 @@ func (kcp *KCP) flush() {
 		ackSeg.ReceivingWindow = uint32(kcp.rcv_nxt + kcp.rcv_wnd)
 		ackSeg.ReceivingWindow = uint32(kcp.rcv_nxt + kcp.rcv_wnd)
 		ackSeg.ReceivingNext = kcp.rcv_nxt
 		ackSeg.ReceivingNext = kcp.rcv_nxt
 		kcp.output.Write(ackSeg)
 		kcp.output.Write(ackSeg)
-		segSent = true
+		kcp.receivingUpdated = false
 	}
 	}
 
 
 	// calculate window size
 	// calculate window size
@@ -465,7 +467,7 @@ func (kcp *KCP) flush() {
 			}
 			}
 
 
 			kcp.output.Write(segment)
 			kcp.output.Write(segment)
-			segSent = true
+			kcp.sendingUpdated = false
 
 
 			if segment.transmit >= kcp.dead_link {
 			if segment.transmit >= kcp.dead_link {
 				kcp.state = 0xFFFFFFFF
 				kcp.state = 0xFFFFFFFF
@@ -473,42 +475,24 @@ func (kcp *KCP) flush() {
 		}
 		}
 	}
 	}
 
 
-	// flash remain segments
-	kcp.output.Flush()
-
-	if !segSent && kcp.state == StateReadyToClose {
-		kcp.output.Write(&CmdOnlySegment{
+	if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
+		seg := &CmdOnlySegment{
 			Conv:         kcp.conv,
 			Conv:         kcp.conv,
 			Cmd:          SegmentCommandPing,
 			Cmd:          SegmentCommandPing,
-			Opt:          SegmentOptionClose,
-			ReceivinNext: kcp.rcv_nxt,
-			SendingNext:  kcp.snd_nxt,
-		})
-		kcp.output.Flush()
-		segSent = true
-	}
-
-	if !segSent && kcp.state == StateTerminating {
-		kcp.output.Write(&CmdOnlySegment{
-			Conv:         kcp.conv,
-			Cmd:          SegmentCommandTerminated,
 			ReceivinNext: kcp.rcv_nxt,
 			ReceivinNext: kcp.rcv_nxt,
 			SendingNext:  kcp.snd_una,
 			SendingNext:  kcp.snd_una,
-		})
-		kcp.output.Flush()
-		segSent = true
+		}
+		if kcp.state == StateReadyToClose {
+			seg.Opt = SegmentOptionClose
+		}
+		kcp.output.Write(seg)
+		kcp.lastPingTime = kcp.current
+		kcp.sendingUpdated = false
+		kcp.receivingUpdated = false
 	}
 	}
 
 
-	if !segSent {
-		kcp.output.Write(&CmdOnlySegment{
-			Conv:         kcp.conv,
-			Cmd:          SegmentCommandPing,
-			ReceivinNext: kcp.rcv_nxt,
-			SendingNext:  kcp.snd_una,
-		})
-		kcp.output.Flush()
-		segSent = true
-	}
+	// flash remain segments
+	kcp.output.Flush()
 
 
 	// update ssthresh
 	// update ssthresh
 	// rate halving, https://tools.ietf.org/html/rfc6937
 	// rate halving, https://tools.ietf.org/html/rfc6937

+ 4 - 0
transport/internet/kcp/segment.go

@@ -28,6 +28,10 @@ type ISegment interface {
 	Bytes([]byte) []byte
 	Bytes([]byte) []byte
 }
 }
 
 
+const (
+	DataSegmentOverhead = 18
+)
+
 type DataSegment struct {
 type DataSegment struct {
 	Conv        uint16
 	Conv        uint16
 	Opt         SegmentOption
 	Opt         SegmentOption