Ver código fonte

better handling error from Pipe()

Darien Raymond 9 anos atrás
pai
commit
6804d8f73a

+ 9 - 0
common/io/transport.go

@@ -1,6 +1,7 @@
 package io
 
 import (
+	"io"
 	"v2ray.com/core/common/log"
 )
 
@@ -25,3 +26,11 @@ func Pipe(reader Reader, writer Writer) error {
 		}
 	}
 }
+
+func PipeUntilEOF(reader Reader, writer Writer) error {
+	err := Pipe(reader, writer)
+	if err != nil && err != io.EOF {
+		return err
+	}
+	return nil
+}

+ 6 - 2
proxy/dokodemo/dokodemo.go

@@ -179,7 +179,9 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 		v2reader := v2io.NewAdaptiveReader(reader)
 		defer v2reader.Release()
 
-		v2io.Pipe(v2reader, ray.InboundInput())
+		if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
+			log.Info("Dokodemo: Failed to transport all TCP request: ", err)
+		}
 		wg.Done()
 		ray.InboundInput().Close()
 	}()
@@ -189,7 +191,9 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 		v2writer := v2io.NewAdaptiveWriter(conn)
 		defer v2writer.Release()
 
-		v2io.Pipe(ray.InboundOutput(), v2writer)
+		if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
+			log.Info("Dokodemo: Failed to transport all TCP response: ", err)
+		}
 		wg.Done()
 	}()
 

+ 6 - 2
proxy/freedom/freedom.go

@@ -104,7 +104,9 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 		v2writer := v2io.NewAdaptiveWriter(conn)
 		defer v2writer.Release()
 
-		v2io.Pipe(input, v2writer)
+		if err := v2io.PipeUntilEOF(input, v2writer); err != nil {
+			log.Info("Freedom: Failed to transport all TCP request: ", err)
+		}
 		if tcpConn, ok := conn.(*tcp.RawConnection); ok {
 			tcpConn.CloseWrite()
 		}
@@ -121,7 +123,9 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 	}
 
 	v2reader := v2io.NewAdaptiveReader(reader)
-	v2io.Pipe(v2reader, output)
+	if err := v2io.PipeUntilEOF(v2reader, output); err != nil {
+		log.Info("Freedom: Failed to transport all TCP response: ", err)
+	}
 	v2reader.Release()
 	ray.OutboundOutput().Close()
 

+ 6 - 2
proxy/http/server.go

@@ -160,7 +160,9 @@ func (this *Server) transport(input io.Reader, output io.Writer, ray ray.Inbound
 		v2reader := v2io.NewAdaptiveReader(input)
 		defer v2reader.Release()
 
-		v2io.Pipe(v2reader, ray.InboundInput())
+		if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
+			log.Info("HTTP: Failed to transport all TCP request: ", err)
+		}
 		ray.InboundInput().Close()
 		wg.Done()
 	}()
@@ -169,7 +171,9 @@ func (this *Server) transport(input io.Reader, output io.Writer, ray ray.Inbound
 		v2writer := v2io.NewAdaptiveWriter(output)
 		defer v2writer.Release()
 
-		v2io.Pipe(ray.InboundOutput(), v2writer)
+		if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
+			log.Info("HTTP: Failed to transport all TCP response: ", err)
+		}
 		ray.InboundOutput().Release()
 		wg.Done()
 	}()

+ 8 - 17
proxy/shadowsocks/client.go

@@ -2,7 +2,6 @@ package shadowsocks
 
 import (
 	"errors"
-	"io"
 	"sync"
 	"v2ray.com/core/app"
 	"v2ray.com/core/common/alloc"
@@ -113,18 +112,14 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe
 				return
 			}
 
-			if err := v2io.Pipe(responseReader, ray.OutboundOutput()); err != nil {
-				if err != io.EOF {
-					log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
-				}
+			if err := v2io.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
+				log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
 			}
 		}()
 
 		bufferedWriter.SetCached(false)
-		if err := v2io.Pipe(ray.OutboundInput(), bodyWriter); err != nil {
-			if err != io.EOF {
-				log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
-			}
+		if err := v2io.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
+			log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
 		}
 
 		responseMutex.Lock()
@@ -143,10 +138,8 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe
 				User:   user,
 			}
 
-			if err := v2io.Pipe(reader, ray.OutboundOutput()); err != nil {
-				if err != io.EOF {
-					log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
-				}
+			if err := v2io.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
+				log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
 			}
 		}()
 
@@ -159,10 +152,8 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe
 				return errors.New("Shadowsocks|Client: Failed to write payload: " + err.Error())
 			}
 		}
-		if err := v2io.Pipe(ray.OutboundInput(), writer); err != nil {
-			if err != io.EOF {
-				log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
-			}
+		if err := v2io.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
+			log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
 		}
 
 		responseMutex.Lock()

+ 6 - 2
proxy/shadowsocks/server.go

@@ -198,11 +198,15 @@ func (this *Server) handleConnection(conn internet.Connection) {
 			responseWriter.Write(payload)
 			bufferedWriter.SetCached(false)
 
-			v2io.Pipe(ray.InboundOutput(), responseWriter)
+			if err := v2io.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
+				log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
+			}
 		}
 	}()
 
-	v2io.Pipe(bodyReader, ray.InboundInput())
+	if err := v2io.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
+		log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
+	}
 	ray.InboundInput().Close()
 
 	writeFinish.Lock()

+ 6 - 2
proxy/socks/server.go

@@ -304,14 +304,18 @@ func (this *Server) transport(reader io.Reader, writer io.Writer, session *proxy
 		v2reader := v2io.NewAdaptiveReader(reader)
 		defer v2reader.Release()
 
-		v2io.Pipe(v2reader, input)
+		if err := v2io.PipeUntilEOF(v2reader, input); err != nil {
+			log.Info("Socks|Server: Failed to transport all TCP request: ", err)
+		}
 		input.Close()
 	}()
 
 	v2writer := v2io.NewAdaptiveWriter(writer)
 	defer v2writer.Release()
 
-	v2io.Pipe(output, v2writer)
+	if err := v2io.PipeUntilEOF(output, v2writer); err != nil {
+		log.Info("Socks|Server: Failed to transport all TCP response: ", err)
+	}
 	output.Release()
 }
 

+ 2 - 4
proxy/vmess/inbound/inbound.go

@@ -192,8 +192,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection
 		} else {
 			requestReader = v2io.NewAdaptiveReader(bodyReader)
 		}
-		err := v2io.Pipe(requestReader, input)
-		if err != io.EOF {
+		if err := v2io.PipeUntilEOF(requestReader, input); err != nil {
 			connection.SetReusable(false)
 		}
 
@@ -229,8 +228,7 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection
 
 		writer.SetCached(false)
 
-		err = v2io.Pipe(output, v2writer)
-		if err != io.EOF {
+		if err := v2io.PipeUntilEOF(output, v2writer); err != nil {
 			connection.SetReusable(false)
 		}
 

+ 2 - 5
proxy/vmess/outbound/outbound.go

@@ -1,7 +1,6 @@
 package outbound
 
 import (
-	"io"
 	"sync"
 
 	"v2ray.com/core/app"
@@ -107,8 +106,7 @@ func (this *VMessOutboundHandler) handleRequest(session *encoding.ClientSession,
 	}
 	writer.SetCached(false)
 
-	err := v2io.Pipe(input, streamWriter)
-	if err != io.EOF {
+	if err := v2io.PipeUntilEOF(input, streamWriter); err != nil {
 		conn.SetReusable(false)
 	}
 
@@ -150,8 +148,7 @@ func (this *VMessOutboundHandler) handleResponse(session *encoding.ClientSession
 		bodyReader = v2io.NewAdaptiveReader(decryptReader)
 	}
 
-	err = v2io.Pipe(bodyReader, output)
-	if err != io.EOF {
+	if err := v2io.PipeUntilEOF(bodyReader, output); err != nil {
 		conn.SetReusable(false)
 	}