Browse Source

check connection state for every write operation

Darien Raymond 8 years ago
parent
commit
a6c0ef11ba
1 changed files with 33 additions and 16 deletions
  1. 33 16
      transport/internet/kcp/connection.go

+ 33 - 16
transport/internet/kcp/connection.go

@@ -341,21 +341,30 @@ func (c *Connection) Write(b []byte) (int, error) {
 	totalWritten := 0
 
 	for {
-		if c == nil || c.State() != StateActive {
-			return totalWritten, io.ErrClosedPipe
-		}
+		dataWritten := false
+		for {
+			if c == nil || c.State() != StateActive {
+				return totalWritten, io.ErrClosedPipe
+			}
+			if !c.sendingWorker.Push(func(bb []byte) (int, error) {
+				n := copy(bb[:c.mss], b[totalWritten:])
+				totalWritten += n
+				return n, nil
+			}) {
+				break
+			}
+
+			dataWritten = true
 
-		for c.sendingWorker.Push(func(bb []byte) (int, error) {
-			n := copy(bb[:c.mss], b[totalWritten:])
-			totalWritten += n
-			return n, nil
-		}) {
-			c.dataUpdater.WakeUp()
 			if totalWritten == len(b) {
 				return totalWritten, nil
 			}
 		}
 
+		if dataWritten {
+			c.dataUpdater.WakeUp()
+		}
+
 		if err := c.waitForDataOutput(); err != nil {
 			return totalWritten, err
 		}
@@ -367,19 +376,27 @@ func (c *Connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
 	defer mb.Release()
 
 	for {
-		if c == nil || c.State() != StateActive {
-			return io.ErrClosedPipe
-		}
+		dataWritten := false
+		for {
+			if c == nil || c.State() != StateActive {
+				return io.ErrClosedPipe
+			}
 
-		for c.sendingWorker.Push(func(bb []byte) (int, error) {
-			return mb.Read(bb[:c.mss])
-		}) {
-			c.dataUpdater.WakeUp()
+			if !c.sendingWorker.Push(func(bb []byte) (int, error) {
+				return mb.Read(bb[:c.mss])
+			}) {
+				break
+			}
+			dataWritten = true
 			if mb.IsEmpty() {
 				return nil
 			}
 		}
 
+		if dataWritten {
+			c.dataUpdater.WakeUp()
+		}
+
 		if err := c.waitForDataOutput(); err != nil {
 			return err
 		}