Ver Fonte

optimize ping and updater logic

Darien Raymond há 9 anos atrás
pai
commit
f3a83c57ab

+ 27 - 28
transport/internet/kcp/connection.go

@@ -233,10 +233,11 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr,
 		},
 		conn.updateTask)
 	conn.pingUpdater = NewUpdater(
-		3000, // 3 seconds
+		5000, // 5 seconds
 		func() bool { return conn.State() != StateTerminated },
 		func() bool { return conn.State() == StateTerminated },
 		conn.updateTask)
+	conn.pingUpdater.WakeUp()
 
 	return conn
 }
@@ -333,24 +334,22 @@ func (this *Connection) SetState(state State) {
 	switch state {
 	case StateReadyToClose:
 		this.receivingWorker.CloseRead()
-		this.dataUpdater.WakeUp()
 	case StatePeerClosed:
 		this.sendingWorker.CloseWrite()
-		this.dataUpdater.WakeUp()
 	case StateTerminating:
 		this.receivingWorker.CloseRead()
 		this.sendingWorker.CloseWrite()
-		this.dataUpdater.interval = time.Second
-		this.dataUpdater.WakeUp()
+		this.pingUpdater.interval = time.Second
 	case StatePeerTerminating:
 		this.sendingWorker.CloseWrite()
-		this.dataUpdater.WakeUp()
+		this.pingUpdater.interval = time.Second
 	case StateTerminated:
 		this.receivingWorker.CloseRead()
 		this.sendingWorker.CloseWrite()
-		this.dataUpdater.interval = time.Second
+		this.pingUpdater.interval = time.Second
 		this.dataUpdater.WakeUp()
-		this.Terminate()
+		this.pingUpdater.WakeUp()
+		go this.Terminate()
 	}
 }
 
@@ -488,7 +487,6 @@ func (this *Connection) OnPeerClosed() {
 func (this *Connection) Input(data []byte) int {
 	current := this.Elapsed()
 	atomic.StoreUint32(&this.lastIncomingTime, current)
-	this.dataUpdater.WakeUp()
 
 	var seg Segment
 	for {
@@ -502,10 +500,12 @@ func (this *Connection) Input(data []byte) int {
 			this.HandleOption(seg.Option)
 			this.receivingWorker.ProcessSegment(seg)
 			this.dataInputCond.Signal()
+			this.dataUpdater.WakeUp()
 		case *AckSegment:
 			this.HandleOption(seg.Option)
 			this.sendingWorker.ProcessSegment(current, seg)
 			this.dataOutputCond.Signal()
+			this.dataUpdater.WakeUp()
 		case *CmdOnlySegment:
 			this.HandleOption(seg.Option)
 			if seg.Command == CommandTerminate {
@@ -545,12 +545,7 @@ func (this *Connection) flush() {
 
 	if this.State() == StateTerminating {
 		log.Debug("KCP|Connection: #", this.conv, " sending terminating cmd.")
-		seg := NewCmdOnlySegment()
-		defer seg.Release()
-
-		seg.Conv = this.conv
-		seg.Command = CommandTerminate
-		this.output.Write(seg)
+		this.Ping(current, CommandTerminate)
 		this.output.Flush()
 
 		if current-atomic.LoadUint32(&this.stateBeginTime) > 8000 {
@@ -570,19 +565,8 @@ func (this *Connection) flush() {
 	this.receivingWorker.Flush(current)
 	this.sendingWorker.Flush(current)
 
-	if current-atomic.LoadUint32(&this.lastPingTime) >= 3000 {
-		seg := NewCmdOnlySegment()
-		seg.Conv = this.conv
-		seg.Command = CommandPing
-		seg.ReceivinNext = this.receivingWorker.nextNumber
-		seg.SendingNext = this.sendingWorker.firstUnacknowledged
-		seg.PeerRTO = this.roundTrip.Timeout()
-		if this.State() == StateReadyToClose {
-			seg.Option = SegmentOptionClose
-		}
-		this.output.Write(seg)
-		this.lastPingTime = current
-		seg.Release()
+	if current-atomic.LoadUint32(&this.lastPingTime) >= 1000 {
+		this.Ping(current, CommandPing)
 	}
 
 	// flash remain segments
@@ -592,3 +576,18 @@ func (this *Connection) flush() {
 func (this *Connection) State() State {
 	return State(atomic.LoadInt32((*int32)(&this.state)))
 }
+
+func (this *Connection) Ping(current uint32, cmd Command) {
+	seg := NewCmdOnlySegment()
+	seg.Conv = this.conv
+	seg.Command = cmd
+	seg.ReceivinNext = this.receivingWorker.nextNumber
+	seg.SendingNext = this.sendingWorker.firstUnacknowledged
+	seg.PeerRTO = this.roundTrip.Timeout()
+	if this.State() == StateReadyToClose {
+		seg.Option = SegmentOptionClose
+	}
+	this.output.Write(seg)
+	atomic.StoreUint32(&this.lastPingTime, current)
+	seg.Release()
+}

+ 4 - 1
transport/internet/kcp/listener.go

@@ -139,7 +139,10 @@ func (this *Listener) Accept() (internet.Connection, error) {
 			return nil, ErrClosedListener
 		}
 		select {
-		case conn := <-this.awaitingConns:
+		case conn, open := <-this.awaitingConns:
+			if !open {
+				break
+			}
 			if this.tlsConfig != nil {
 				tlsConn := tls.Server(conn, this.tlsConfig)
 				return v2tls.NewConnection(tlsConn), nil

+ 16 - 7
transport/internet/kcp/sending.go

@@ -171,13 +171,14 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI
 
 type SendingWorker struct {
 	sync.RWMutex
-	conn                *Connection
-	window              *SendingWindow
-	firstUnacknowledged uint32
-	nextNumber          uint32
-	remoteNextNumber    uint32
-	controlWindow       uint32
-	fastResend          uint32
+	conn                       *Connection
+	window                     *SendingWindow
+	firstUnacknowledged        uint32
+	firstUnacknowledgedUpdated bool
+	nextNumber                 uint32
+	remoteNextNumber           uint32
+	controlWindow              uint32
+	fastResend                 uint32
 }
 
 func NewSendingWorker(kcp *Connection) *SendingWorker {
@@ -205,11 +206,15 @@ func (this *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) {
 
 // Private: Visible for testing.
 func (this *SendingWorker) FindFirstUnacknowledged() {
+	v := this.firstUnacknowledged
 	if !this.window.IsEmpty() {
 		this.firstUnacknowledged = this.window.First().Number
 	} else {
 		this.firstUnacknowledged = this.nextNumber
 	}
+	if v != this.firstUnacknowledged {
+		this.firstUnacknowledgedUpdated = true
+	}
 }
 
 // Private: Visible for testing.
@@ -322,7 +327,11 @@ func (this *SendingWorker) Flush(current uint32) {
 
 	if !this.window.IsEmpty() {
 		this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd)
+	} else if this.firstUnacknowledgedUpdated {
+		this.conn.Ping(current, CommandPing)
 	}
+
+	this.firstUnacknowledgedUpdated = false
 }
 
 func (this *SendingWorker) CloseWrite() {