Bladeren bron

improve writing performance

v2ray 9 jaren geleden
bovenliggende
commit
991b2703dc
1 gewijzigde bestanden met toevoegingen van 27 en 10 verwijderingen
  1. 27 10
      transport/internet/kcp/connection.go

+ 27 - 10
transport/internet/kcp/connection.go

@@ -98,13 +98,14 @@ func (this *RountTripInfo) SmoothedTime() uint32 {
 
 // Connection is a KCP connection over UDP.
 type Connection struct {
-	block         Authenticator
-	local, remote net.Addr
-	rd            time.Time
-	wd            time.Time // write deadline
-	writer        io.WriteCloser
-	since         int64
-	dataInputCond *sync.Cond
+	block          Authenticator
+	local, remote  net.Addr
+	rd             time.Time
+	wd             time.Time // write deadline
+	writer         io.WriteCloser
+	since          int64
+	dataInputCond  *sync.Cond
+	dataOutputCond *sync.Cond
 
 	conv             uint16
 	state            State
@@ -135,6 +136,7 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr,
 	conn.writer = writerCloser
 	conn.since = nowMillisec()
 	conn.dataInputCond = sync.NewCond(new(sync.Mutex))
+	conn.dataOutputCond = sync.NewCond(new(sync.Mutex))
 
 	authWriter := &AuthenticationWriter{
 		Authenticator: block,
@@ -222,12 +224,25 @@ func (this *Connection) Write(b []byte) (int, error) {
 			}
 		}
 
+		var timer *time.Timer
+		if !this.wd.IsZero() {
+			duration := this.wd.Sub(time.Now())
+			if duration <= 0 {
+				return totalWritten, errTimeout
+			}
+			timer = time.AfterFunc(duration, this.dataOutputCond.Signal)
+		}
+		this.dataOutputCond.L.Lock()
+		this.dataOutputCond.Wait()
+		this.dataOutputCond.L.Unlock()
+
+		if timer != nil {
+			timer.Stop()
+		}
+
 		if !this.wd.IsZero() && this.wd.Before(time.Now()) {
 			return totalWritten, errTimeout
 		}
-
-		// Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec.
-		time.Sleep(time.Second)
 	}
 }
 
@@ -260,6 +275,7 @@ func (this *Connection) Close() error {
 	}
 
 	this.dataInputCond.Broadcast()
+	this.dataOutputCond.Broadcast()
 
 	state := this.State()
 	if state == StateReadyToClose ||
@@ -410,6 +426,7 @@ func (this *Connection) Input(data []byte) int {
 		case *AckSegment:
 			this.HandleOption(seg.Opt)
 			this.sendingWorker.ProcessSegment(current, seg)
+			this.dataOutputCond.Signal()
 		case *CmdOnlySegment:
 			this.HandleOption(seg.Opt)
 			if seg.Cmd == SegmentCommandTerminated {