Pārlūkot izejas kodu

introduce a new state: peer terminating

v2ray 9 gadi atpakaļ
vecāks
revīzija
476b3c68d2

+ 20 - 6
transport/internet/kcp/connection.go

@@ -22,11 +22,12 @@ var (
 type State int32
 
 const (
-	StateActive       State = 0
-	StateReadyToClose State = 1
-	StatePeerClosed   State = 2
-	StateTerminating  State = 3
-	StateTerminated   State = 4
+	StateActive          State = 0
+	StateReadyToClose    State = 1
+	StatePeerClosed      State = 2
+	StateTerminating     State = 3
+	StatePeerTerminating State = 4
+	StateTerminated      State = 5
 )
 
 const (
@@ -177,6 +178,10 @@ func (this *Connection) Read(b []byte) (int, error) {
 			return nBytes, nil
 		}
 
+		if this.State() == StatePeerTerminating {
+			return 0, io.EOF
+		}
+
 		var timer *time.Timer
 		if !this.rd.IsZero() {
 			duration := this.rd.Sub(time.Now())
@@ -240,6 +245,8 @@ func (this *Connection) SetState(state State) {
 	case StateTerminating:
 		this.receivingWorker.CloseRead()
 		this.sendingWorker.CloseWrite()
+	case StatePeerTerminating:
+		this.sendingWorker.CloseWrite()
 	case StateTerminated:
 		this.receivingWorker.CloseRead()
 		this.sendingWorker.CloseWrite()
@@ -268,6 +275,9 @@ func (this *Connection) Close() error {
 	if state == StatePeerClosed {
 		this.SetState(StateTerminating)
 	}
+	if state == StatePeerTerminating {
+		this.SetState(StateTerminated)
+	}
 
 	return nil
 }
@@ -405,8 +415,9 @@ func (this *Connection) Input(data []byte) int {
 			if seg.Cmd == SegmentCommandTerminated {
 				state := this.State()
 				if state == StateActive ||
-					state == StateReadyToClose ||
 					state == StatePeerClosed {
+					this.SetState(StatePeerTerminating)
+				} else if state == StateReadyToClose {
 					this.SetState(StateTerminating)
 				} else if state == StateTerminating {
 					this.SetState(StateTerminated)
@@ -450,6 +461,9 @@ func (this *Connection) flush() {
 		}
 		return
 	}
+	if this.State() == StatePeerTerminating && current-atomic.LoadUint32(&this.stateBeginTime) > 4000 {
+		this.SetState(StateTerminating)
+	}
 
 	if this.State() == StateReadyToClose && current-atomic.LoadUint32(&this.stateBeginTime) > 15000 {
 		this.SetState(StateTerminating)

+ 7 - 0
transport/internet/kcp/listener.go

@@ -59,10 +59,17 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
 	if !this.running {
 		return
 	}
+	if payload.Len() < 4 {
+		return
+	}
 	conv := serial.BytesToUint16(payload.Value)
+	cmd := SegmentCommand(payload.Value[2])
 	sourceId := src.NetAddr() + "|" + serial.Uint16ToString(conv)
 	conn, found := this.sessions[sourceId]
 	if !found {
+		if cmd == SegmentCommandTerminated {
+			return
+		}
 		log.Debug("KCP|Listener: Creating session with id(", sourceId, ") from ", src)
 		writer := &Writer{
 			id:       sourceId,