Browse Source

task engine for all proxies

Darien Raymond 8 years ago
parent
commit
609dbc1f13

+ 87 - 0
common/signal/exec_test.go

@@ -0,0 +1,87 @@
+package signal_test
+
+import (
+	"errors"
+	"testing"
+
+	. "v2ray.com/core/common/signal"
+	"v2ray.com/core/testing/assert"
+)
+
+func TestErrorOrFinish2_Error(t *testing.T) {
+	assert := assert.On(t)
+
+	c1 := make(chan error, 1)
+	c2 := make(chan error, 2)
+	c := make(chan error, 1)
+
+	go func() {
+		c <- ErrorOrFinish2(c1, c2)
+	}()
+
+	c1 <- errors.New("test")
+	err := <-c
+	assert.String(err.Error()).Equals("test")
+}
+
+func TestErrorOrFinish2_Error2(t *testing.T) {
+	assert := assert.On(t)
+
+	c1 := make(chan error, 1)
+	c2 := make(chan error, 2)
+	c := make(chan error, 1)
+
+	go func() {
+		c <- ErrorOrFinish2(c1, c2)
+	}()
+
+	c2 <- errors.New("test")
+	err := <-c
+	assert.String(err.Error()).Equals("test")
+}
+
+func TestErrorOrFinish2_NoneError(t *testing.T) {
+	assert := assert.On(t)
+
+	c1 := make(chan error, 1)
+	c2 := make(chan error, 2)
+	c := make(chan error, 1)
+
+	go func() {
+		c <- ErrorOrFinish2(c1, c2)
+	}()
+
+	close(c1)
+	select {
+	case <-c:
+		t.Fail()
+	default:
+	}
+
+	close(c2)
+	err := <-c
+	assert.Error(err).IsNil()
+}
+
+func TestErrorOrFinish2_NoneError2(t *testing.T) {
+	assert := assert.On(t)
+
+	c1 := make(chan error, 1)
+	c2 := make(chan error, 2)
+	c := make(chan error, 1)
+
+	go func() {
+		c <- ErrorOrFinish2(c1, c2)
+	}()
+
+	close(c2)
+	select {
+	case <-c:
+		t.Fail()
+	default:
+	}
+
+	close(c1)
+	err := <-c
+	assert.Error(err).IsNil()
+}

+ 21 - 8
proxy/freedom/freedom.go

@@ -13,6 +13,7 @@ import (
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/ray"
@@ -101,14 +102,17 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
 		}
 	}
 
-	go func() {
+	requestDone := signal.ExecuteAsync(func() error {
+		defer input.Release()
+
 		v2writer := buf.NewWriter(conn)
 		defer v2writer.Release()
 
 		if err := buf.PipeUntilEOF(input, v2writer); err != nil {
-			log.Info("Freedom: Failed to transport all TCP request: ", err)
+			return err
 		}
-	}()
+		return nil
+	})
 
 	var reader io.Reader = conn
 
@@ -120,12 +124,21 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
 		reader = v2net.NewTimeOutReader(timeout /* seconds */, conn)
 	}
 
-	v2reader := buf.NewReader(reader)
-	if err := buf.PipeUntilEOF(v2reader, output); err != nil {
-		log.Info("Freedom: Failed to transport all TCP response: ", err)
+	responseDone := signal.ExecuteAsync(func() error {
+		defer output.Close()
+
+		v2reader := buf.NewReader(reader)
+		defer v2reader.Release()
+
+		if err := buf.PipeUntilEOF(v2reader, output); err != nil {
+			return err
+		}
+		return nil
+	})
+
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("Freedom: Connection ending with ", err)
 	}
-	v2reader.Release()
-	ray.OutboundOutput().Close()
 }
 
 type Factory struct{}

+ 40 - 39
proxy/http/server.go

@@ -17,9 +17,9 @@ import (
 	"v2ray.com/core/common/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // Server is a HTTP proxy server.
@@ -155,35 +155,32 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
 	}
 
 	ray := v.packetDispatcher.DispatchToOutbound(session)
-	v.transport(reader, writer, ray)
-}
 
-func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay) {
-	var wg sync.WaitGroup
-	wg.Add(2)
-	defer wg.Wait()
+	requestDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundInput().Close()
 
-	go func() {
-		v2reader := buf.NewReader(input)
+		v2reader := buf.NewReader(reader)
 		defer v2reader.Release()
 
 		if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
-			log.Info("HTTP: Failed to transport all TCP request: ", err)
+			return err
 		}
-		ray.InboundInput().Close()
-		wg.Done()
-	}()
+		return nil
+	})
 
-	go func() {
-		v2writer := buf.NewWriter(output)
+	responseDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundOutput().Release()
+
+		v2writer := buf.NewWriter(writer)
 		defer v2writer.Release()
 
 		if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
-			log.Info("HTTP: Failed to transport all TCP response: ", err)
+			return err
 		}
-		ray.InboundOutput().Release()
-		wg.Done()
-	}()
+		return nil
+	})
+
+	signal.ErrorOrFinish2(requestDone, responseDone)
 }
 
 // @VisibleForTesting
@@ -239,27 +236,26 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
 	StripHopByHopHeaders(request)
 
 	ray := v.packetDispatcher.DispatchToOutbound(session)
-	defer ray.InboundInput().Close()
-	defer ray.InboundOutput().Release()
 
-	var finish sync.WaitGroup
-	finish.Add(1)
-	go func() {
-		defer finish.Done()
+	requestDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundInput().Close()
+
 		requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput()))
 		defer requestWriter.Release()
 
 		err := request.Write(requestWriter)
 		if err != nil {
-			log.Warning("HTTP: Failed to write request: ", err)
-			return
+			return err
 		}
-		requestWriter.Flush()
-	}()
+		if err := requestWriter.Flush(); err != nil {
+			return err
+		}
+		return nil
+	})
+
+	responseDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundOutput().Release()
 
-	finish.Add(1)
-	go func() {
-		defer finish.Done()
 		responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
 		response, err := http.ReadResponse(responseReader, request)
 		if err != nil {
@@ -267,14 +263,19 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
 			response = v.GenerateResponse(503, "Service Unavailable")
 		}
 		responseWriter := bufio.NewWriter(writer)
-		err = response.Write(responseWriter)
-		if err != nil {
-			log.Warning("HTTP: Failed to write response: ", err)
-			return
+		if err := response.Write(responseWriter); err != nil {
+			return err
+		}
+
+		if err := responseWriter.Flush(); err != nil {
+			return err
 		}
-		responseWriter.Flush()
-	}()
-	finish.Wait()
+		return nil
+	})
+
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("HTTP|Server: Connecton ending with ", err)
+	}
 }
 
 // ServerFactory is a InboundHandlerFactory.

+ 48 - 37
proxy/shadowsocks/client.go

@@ -1,8 +1,6 @@
 package shadowsocks
 
 import (
-	"sync"
-
 	"v2ray.com/core/app"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
@@ -10,6 +8,7 @@ import (
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/ray"
@@ -38,8 +37,6 @@ func NewClient(config *ClientConfig, space app.Space, meta *proxy.OutboundHandle
 // Dispatch implements OutboundHandler.Dispatch().
 func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
 	defer payload.Release()
-	defer ray.OutboundInput().Release()
-	defer ray.OutboundOutput().Close()
 
 	network := destination.Network
 
@@ -109,48 +106,38 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
 			}
 		}
 
-		var responseMutex sync.Mutex
-		responseMutex.Lock()
+		bufferedWriter.SetBuffered(false)
+
+		requestDone := signal.ExecuteAsync(func() error {
+			defer ray.OutboundInput().Release()
+
+			if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
+				return err
+			}
+			return nil
+		})
 
-		go func() {
-			defer responseMutex.Unlock()
+		responseDone := signal.ExecuteAsync(func() error {
+			defer ray.OutboundOutput().Close()
 
 			responseReader, err := ReadTCPResponse(user, conn)
 			if err != nil {
-				log.Warning("Shadowsocks|Client: Failed to read response: ", err)
-				return
+				return err
 			}
 
 			if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
-				log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
+				return err
 			}
-		}()
 
-		bufferedWriter.SetBuffered(false)
-		if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
-			log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
-		}
+			return nil
+		})
 
-		responseMutex.Lock()
+		if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+			log.Info("Shadowsocks|Client: Connection ends with ", err)
+		}
 	}
 
 	if request.Command == protocol.RequestCommandUDP {
-		timedReader := v2net.NewTimeOutReader(16, conn)
-		var responseMutex sync.Mutex
-		responseMutex.Lock()
-
-		go func() {
-			defer responseMutex.Unlock()
-
-			reader := &UDPReader{
-				Reader: timedReader,
-				User:   user,
-			}
-
-			if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
-				log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
-			}
-		}()
 
 		writer := &UDPWriter{
 			Writer:  conn,
@@ -162,11 +149,35 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
 				return
 			}
 		}
-		if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
-			log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
-		}
 
-		responseMutex.Lock()
+		requestDone := signal.ExecuteAsync(func() error {
+			defer ray.OutboundInput().Release()
+
+			if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
+				log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
+				return err
+			}
+			return nil
+		})
+
+		timedReader := v2net.NewTimeOutReader(16, conn)
+
+		responseDone := signal.ExecuteAsync(func() error {
+			defer ray.OutboundOutput().Close()
+
+			reader := &UDPReader{
+				Reader: timedReader,
+				User:   user,
+			}
+
+			if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
+				log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
+				return err
+			}
+			return nil
+		})
+
+		signal.ErrorOrFinish2(requestDone, responseDone)
 	}
 }
 

+ 29 - 19
proxy/shadowsocks/server.go

@@ -1,8 +1,6 @@
 package shadowsocks
 
 import (
-	"sync"
-
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
 	"v2ray.com/core/common"
@@ -12,6 +10,7 @@ import (
 	"v2ray.com/core/common/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
@@ -177,11 +176,10 @@ func (v *Server) handleConnection(conn internet.Connection) {
 		Inbound:     v.meta,
 	})
 	defer ray.InboundOutput().Release()
+	defer ray.InboundInput().Close()
 
-	var writeFinish sync.Mutex
-	writeFinish.Lock()
-	go func() {
-		defer writeFinish.Unlock()
+	requestDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundOutput().Release()
 
 		bufferedWriter := bufio.NewWriter(conn)
 		defer bufferedWriter.Release()
@@ -189,26 +187,38 @@ func (v *Server) handleConnection(conn internet.Connection) {
 		responseWriter, err := WriteTCPResponse(request, bufferedWriter)
 		if err != nil {
 			log.Warning("Shadowsocks|Server: Failed to write response: ", err)
-			return
+			return err
 		}
 		defer responseWriter.Release()
 
-		if payload, err := ray.InboundOutput().Read(); err == nil {
-			responseWriter.Write(payload)
-			bufferedWriter.SetBuffered(false)
+		payload, err := ray.InboundOutput().Read()
+		if err != nil {
+			return err
+		}
+		responseWriter.Write(payload)
+		bufferedWriter.SetBuffered(false)
 
-			if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
-				log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
-			}
+		if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
+			log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
+			return err
 		}
-	}()
 
-	if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
-		log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
-	}
-	ray.InboundInput().Close()
+		return nil
+	})
+
+	responseDone := signal.ExecuteAsync(func() error {
+		defer ray.InboundInput().Close()
+
+		if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
+			log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
+			return err
+		}
+		return nil
+	})
 
-	writeFinish.Lock()
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("Shadowsocks|Server: Connection ends with ", err)
+	}
 }
 
 type ServerFactory struct{}

+ 21 - 10
proxy/socks/server.go

@@ -15,6 +15,7 @@ import (
 	"v2ray.com/core/common/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/proxy/socks/protocol"
 	"v2ray.com/core/transport/internet"
@@ -299,26 +300,36 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 
-	defer input.Close()
-	defer output.Release()
+	requestDone := signal.ExecuteAsync(func() error {
+		defer input.Close()
 
-	go func() {
 		v2reader := buf.NewReader(reader)
 		defer v2reader.Release()
 
 		if err := buf.PipeUntilEOF(v2reader, input); err != nil {
 			log.Info("Socks|Server: Failed to transport all TCP request: ", err)
+			return err
+		}
+		return nil
+	})
+
+	responseDone := signal.ExecuteAsync(func() error {
+		defer output.Release()
+
+		v2writer := buf.NewWriter(writer)
+		defer v2writer.Release()
+
+		if err := buf.PipeUntilEOF(output, v2writer); err != nil {
+			log.Info("Socks|Server: Failed to transport all TCP response: ", err)
+			return err
 		}
-		input.Close()
-	}()
+		return nil
 
-	v2writer := buf.NewWriter(writer)
-	defer v2writer.Release()
+	})
 
-	if err := buf.PipeUntilEOF(output, v2writer); err != nil {
-		log.Info("Socks|Server: Failed to transport all TCP response: ", err)
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("Socks|Server: Connection ends with ", err)
 	}
-	output.Release()
 }
 
 type ServerFactory struct{}

+ 5 - 6
proxy/vmess/inbound/inbound.go

@@ -137,7 +137,6 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH
 	defer bodyReader.Release()
 
 	if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
-		log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
 		return err
 	}
 	return nil
@@ -160,7 +159,6 @@ func transferResponse(session *encoding.ServerSession, request *protocol.Request
 		}
 
 		if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
-			log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
 			return err
 		}
 	}
@@ -201,13 +199,13 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 	if err != nil {
 		if errors.Cause(err) != io.EOF {
 			log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
-			log.Info("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
+			log.Info("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err)
 		}
 		connection.SetReusable(false)
 		return
 	}
 	log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
-	log.Info("VMessIn: Received request for ", request.Destination())
+	log.Info("VMess|Inbound: Received request for ", request.Destination())
 
 	connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
 
@@ -245,13 +243,14 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 		return transferResponse(session, request, response, output, writer)
 	})
 
-	err = signal.ErrorOrFinish2(requestDone, responseDone)
-	if err != nil {
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("VMess|Inbound: Connection ending with ", err)
 		connection.SetReusable(false)
 		return
 	}
 
 	if err := writer.Flush(); err != nil {
+		log.Info("VMess|Inbound: Failed to flush remain data: ", err)
 		connection.SetReusable(false)
 		return
 	}

+ 44 - 52
proxy/vmess/outbound/outbound.go

@@ -1,8 +1,6 @@
 package outbound
 
 import (
-	"sync"
-
 	"v2ray.com/core/app"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
@@ -12,6 +10,7 @@ import (
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
@@ -28,6 +27,7 @@ type VMessOutboundHandler struct {
 
 // Dispatch implements OutboundHandler.Dispatch().
 func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
+	defer payload.Release()
 	defer ray.OutboundInput().Release()
 	defer ray.OutboundOutput().Close()
 
@@ -80,73 +80,65 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B
 	input := ray.OutboundInput()
 	output := ray.OutboundOutput()
 
-	var requestFinish, responseFinish sync.Mutex
-	requestFinish.Lock()
-	responseFinish.Lock()
-
 	session := encoding.NewClientSession(protocol.DefaultIDHash)
 
-	go v.handleRequest(session, conn, request, payload, input, &requestFinish)
-	go v.handleResponse(session, conn, request, rec.Destination(), output, &responseFinish)
+	requestDone := signal.ExecuteAsync(func() error {
+		defer input.Release()
 
-	requestFinish.Lock()
-	responseFinish.Lock()
-	return
-}
+		writer := bufio.NewWriter(conn)
+		defer writer.Release()
 
-func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input buf.Reader, finish *sync.Mutex) {
-	defer finish.Unlock()
+		session.EncodeRequestHeader(request, writer)
 
-	writer := bufio.NewWriter(conn)
-	defer writer.Release()
-	session.EncodeRequestHeader(request, writer)
+		bodyWriter := session.EncodeRequestBody(request, writer)
+		defer bodyWriter.Release()
 
-	bodyWriter := session.EncodeRequestBody(request, writer)
-	defer bodyWriter.Release()
+		if !payload.IsEmpty() {
+			if err := bodyWriter.Write(payload); err != nil {
+				return err
+			}
+		}
+		writer.SetBuffered(false)
 
-	if !payload.IsEmpty() {
-		if err := bodyWriter.Write(payload); err != nil {
-			log.Info("VMess|Outbound: Failed to write payload. Disabling connection reuse.", err)
-			conn.SetReusable(false)
+		if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
+			return err
 		}
-		payload.Release()
-	}
-	writer.SetBuffered(false)
 
-	if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
-		conn.SetReusable(false)
-	}
+		if request.Option.Has(protocol.RequestOptionChunkStream) {
+			if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	responseDone := signal.ExecuteAsync(func() error {
+		defer output.Close()
+
+		reader := bufio.NewReader(conn)
+		defer reader.Release()
 
-	if request.Option.Has(protocol.RequestOptionChunkStream) {
-		err := bodyWriter.Write(buf.NewLocal(8))
+		header, err := session.DecodeResponseHeader(reader)
 		if err != nil {
-			conn.SetReusable(false)
+			return err
 		}
-	}
-	return
-}
+		v.handleCommand(rec.Destination(), header.Command)
 
-func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output buf.Writer, finish *sync.Mutex) {
-	defer finish.Unlock()
+		conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
 
-	reader := bufio.NewReader(conn)
-	defer reader.Release()
+		reader.SetBuffered(false)
+		bodyReader := session.DecodeResponseBody(request, reader)
+		defer bodyReader.Release()
 
-	header, err := session.DecodeResponseHeader(reader)
-	if err != nil {
-		conn.SetReusable(false)
-		log.Warning("VMess|Outbound: Failed to read response from ", request.Destination(), ": ", err)
-		return
-	}
-	v.handleCommand(dest, header.Command)
-
-	conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
+		if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
+			return err
+		}
 
-	reader.SetBuffered(false)
-	bodyReader := session.DecodeResponseBody(request, reader)
-	defer bodyReader.Release()
+		return nil
+	})
 
-	if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("VMess|Outbound: Connection ending with ", err)
 		conn.SetReusable(false)
 	}
 

+ 5 - 1
transport/ray/direct.go

@@ -65,6 +65,8 @@ func (v *Stream) Read() (*buf.Buffer, error) {
 			return b, nil
 		case <-v.srcClose:
 			return nil, io.EOF
+		case <-v.destClose:
+			return nil, io.ErrClosedPipe
 		}
 	}
 }
@@ -97,7 +99,7 @@ func (v *Stream) Close() {
 	close(v.srcClose)
 }
 
-func (v *Stream) Release() {
+func (v *Stream) ForceClose() {
 	defer swallowPanic()
 
 	close(v.destClose)
@@ -114,6 +116,8 @@ func (v *Stream) Release() {
 	}
 }
 
+func (v *Stream) Release() {}
+
 func swallowPanic() {
 	recover()
 }

+ 2 - 0
transport/ray/ray.go

@@ -36,9 +36,11 @@ type Ray interface {
 type InputStream interface {
 	buf.Reader
 	Close()
+	ForceClose()
 }
 
 type OutputStream interface {
 	buf.Writer
 	Close()
+	ForceClose()
 }