Browse Source

handle close wait

Darien Raymond 8 years ago
parent
commit
c09ca41161

+ 1 - 1
proxy/dokodemo/dokodemo.go

@@ -65,7 +65,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 		timedReader := net.NewTimeOutReader(d.config.Timeout, conn)
 		chunkReader := buf.NewReader(timedReader)
 
-		if err := buf.PipeUntilEOF(chunkReader, inboundRay.InboundInput()); err != nil {
+		if err := buf.Pipe(chunkReader, inboundRay.InboundInput()); err != nil {
 			log.Info("Dokodemo: Failed to transport request: ", err)
 			return err
 		}

+ 1 - 1
proxy/freedom/freedom.go

@@ -130,7 +130,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) erro
 		defer output.Close()
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.PipeUntilEOF(v2reader, output); err != nil {
+		if err := buf.Pipe(v2reader, output); err != nil {
 			return err
 		}
 		return nil

+ 1 - 1
proxy/http/server.go

@@ -136,7 +136,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 		defer ray.InboundInput().Close()
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
+		if err := buf.Pipe(v2reader, ray.InboundInput()); err != nil {
 			return err
 		}
 		return nil

+ 2 - 4
proxy/shadowsocks/client.go

@@ -113,7 +113,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
 				return err
 			}
 
-			if err := buf.PipeUntilEOF(responseReader, outboundRay.OutboundOutput()); err != nil {
+			if err := buf.Pipe(responseReader, outboundRay.OutboundOutput()); err != nil {
 				return err
 			}
 
@@ -122,8 +122,6 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
 
 		if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil {
 			log.Info("Shadowsocks|Client: Connection ends with ", err)
-			outboundRay.OutboundInput().CloseError()
-			outboundRay.OutboundOutput().CloseError()
 			return err
 		}
 
@@ -155,7 +153,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error
 				User:   user,
 			}
 
-			if err := buf.PipeUntilEOF(reader, outboundRay.OutboundOutput()); err != nil {
+			if err := buf.Pipe(reader, outboundRay.OutboundOutput()); err != nil {
 				log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
 				return err
 			}

+ 1 - 1
proxy/shadowsocks/server.go

@@ -177,7 +177,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection)
 			return err
 		}
 
-		if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
+		if err := buf.Pipe(ray.InboundOutput(), responseWriter); err != nil {
 			log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
 			return err
 		}

+ 2 - 2
proxy/socks/client.go

@@ -87,7 +87,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
 		}
 		responseFunc = func() error {
 			defer ray.OutboundOutput().Close()
-			return buf.PipeUntilEOF(buf.NewReader(conn), ray.OutboundOutput())
+			return buf.Pipe(buf.NewReader(conn), ray.OutboundOutput())
 		}
 	} else if request.Command == protocol.RequestCommandUDP {
 		udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
@@ -102,7 +102,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
 		responseFunc = func() error {
 			defer ray.OutboundOutput().Close()
 			reader := &UDPReader{reader: net.NewTimeOutReader(16, udpConn)}
-			return buf.PipeUntilEOF(reader, ray.OutboundOutput())
+			return buf.Pipe(reader, ray.OutboundOutput())
 		}
 	}
 

+ 3 - 3
proxy/socks/server.go

@@ -58,6 +58,8 @@ func (s *Server) Network() net.NetworkList {
 }
 
 func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error {
+	conn.SetReusable(false)
+
 	switch network {
 	case net.Network_TCP:
 		return s.processTCP(ctx, conn)
@@ -69,8 +71,6 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
 }
 
 func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error {
-	conn.SetReusable(false)
-
 	timedReader := net.NewTimeOutReader(16 /* seconds, for handshake */, conn)
 	reader := bufio.NewReader(timedReader)
 
@@ -123,7 +123,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 		defer input.Close()
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.PipeUntilEOF(v2reader, input); err != nil {
+		if err := buf.Pipe(v2reader, input); err != nil {
 			log.Info("Socks|Server: Failed to transport all TCP request: ", err)
 			return err
 		}

+ 1 - 1
proxy/vmess/inbound/inbound.go

@@ -133,7 +133,7 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH
 	defer output.Close()
 
 	bodyReader := session.DecodeRequestBody(request, input)
-	if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
+	if err := buf.Pipe(bodyReader, output); err != nil {
 		return err
 	}
 	return nil

+ 1 - 1
proxy/vmess/outbound/outbound.go

@@ -145,7 +145,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb
 
 		reader.SetBuffered(false)
 		bodyReader := session.DecodeResponseBody(request, reader)
-		if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
+		if err := buf.Pipe(bodyReader, output); err != nil {
 			return err
 		}