v2ray 9 年之前
父节点
当前提交
7a84c930d4
共有 1 个文件被更改,包括 20 次插入24 次删除
  1. 20 24
      transport/internet/kcp/connection.go

+ 20 - 24
transport/internet/kcp/connection.go

@@ -69,7 +69,6 @@ type UDPSession struct {
 	rd            time.Time // read deadline
 	wd            time.Time // write deadline
 	chReadEvent   chan struct{}
-	chWriteEvent  chan struct{}
 	writer        io.WriteCloser
 	since         int64
 }
@@ -79,7 +78,6 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
 	sess := new(UDPSession)
 	sess.local = local
 	sess.chReadEvent = make(chan struct{}, 1)
-	sess.chWriteEvent = make(chan struct{}, 1)
 	sess.remote = remote
 	sess.block = block
 	sess.writer = writerCloser
@@ -95,7 +93,10 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
 				opt = OptionClose
 			}
 			ext.Prepend([]byte{byte(cmd), byte(opt)})
-			sess.output(ext)
+			go sess.output(ext)
+		}
+		if sess.state == ConnStateReadyToClose && sess.kcp.WaitSnd() == 0 {
+			go sess.NotifyTermination()
 		}
 	})
 	sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd)
@@ -130,24 +131,17 @@ func (s *UDPSession) Read(b []byte) (int, error) {
 				return 0, errTimeout
 			}
 		}
+		s.Unlock()
 
+		s.kcpAccess.Lock()
 		nBytes := s.kcp.Recv(b)
+		s.kcpAccess.Unlock()
 		if nBytes > 0 {
-			s.Unlock()
 			return nBytes, nil
 		}
-
-		var timeout <-chan time.Time
-		if !s.rd.IsZero() {
-			delay := s.rd.Sub(time.Now())
-			timeout = time.After(delay)
-		}
-
-		s.Unlock()
 		select {
 		case <-s.chReadEvent:
-		case <-timeout:
-			return 0, errTimeout
+		case <-time.After(time.Second):
 		}
 	}
 }
@@ -182,7 +176,8 @@ func (s *UDPSession) Write(b []byte) (int, error) {
 			return 0, errTimeout
 		}
 
-		time.Sleep(time.Duration(s.kcp.WaitSnd()*5) * time.Millisecond)
+		// Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec.
+		time.Sleep(time.Second)
 	}
 }
 
@@ -207,11 +202,13 @@ func (this *UDPSession) NotifyTermination() {
 			this.Unlock()
 			break
 		}
+		this.Unlock()
 		buffer := alloc.NewSmallBuffer().Clear()
 		buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0))
 		this.output(buffer)
+
 		time.Sleep(time.Second)
-		this.Unlock()
+
 	}
 	this.Terminate()
 }
@@ -284,9 +281,15 @@ func (s *UDPSession) output(payload *alloc.Buffer) {
 func (s *UDPSession) updateTask() {
 	for s.state != ConnStateClosed {
 		current := s.Elapsed()
+		s.kcpAccess.Lock()
 		s.kcp.Update(current)
 		interval := s.kcp.Check(s.Elapsed())
-		time.Sleep(time.Duration(interval) * time.Millisecond)
+		s.kcpAccess.Unlock()
+		sleep := interval - current
+		if sleep < 10 {
+			sleep = 10
+		}
+		time.Sleep(time.Duration(sleep) * time.Millisecond)
 	}
 }
 
@@ -297,13 +300,6 @@ func (s *UDPSession) notifyReadEvent() {
 	}
 }
 
-func (s *UDPSession) notifyWriteEvent() {
-	select {
-	case s.chWriteEvent <- struct{}{}:
-	default:
-	}
-}
-
 func (this *UDPSession) MarkPeerClose() {
 	this.Lock()
 	defer this.Unlock()