Ver Fonte

Close outbound output stream properly. fix #1082

Darien Raymond há 7 anos atrás
pai
commit
8eb84a2025

+ 23 - 0
common/functions/functions.go

@@ -0,0 +1,23 @@
+package functions
+
+import "v2ray.com/core/common"
+
+// Task is a function that may return an error.
+type Task func() error
+
+// CloseOnSuccess returns a Task to run a follow task if pre-condition passes, otherwise the error in pre-condition is returned.
+func CloseOnSuccess(pre func() error, followup Task) Task {
+	return func() error {
+		if err := pre(); err != nil {
+			return err
+		}
+		return followup()
+	}
+}
+
+// Close returns a Task to close the object.
+func Close(obj interface{}) Task {
+	return func() error {
+		return common.Close(obj)
+	}
+}

+ 2 - 1
proxy/freedom/freedom.go

@@ -10,6 +10,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/dice"
+	"v2ray.com/core/common/functions"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/signal"
@@ -135,7 +136,7 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
 		return nil
 	}
 
-	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
+	if err := signal.ExecuteParallel(ctx, requestDone, functions.CloseOnSuccess(responseDone, functions.Close(output))); err != nil {
 		return newError("connection ends").Base(err)
 	}
 

+ 2 - 1
proxy/shadowsocks/client.go

@@ -6,6 +6,7 @@ import (
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/functions"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -157,7 +158,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
 			return nil
 		}
 
-		if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
+		if err := signal.ExecuteParallel(ctx, requestDone, functions.CloseOnSuccess(responseDone, functions.Close(link.Writer))); err != nil {
 			return newError("connection ends").Base(err)
 		}
 

+ 2 - 1
proxy/socks/client.go

@@ -7,6 +7,7 @@ import (
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/functions"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -129,7 +130,7 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial
 		}
 	}
 
-	if err := signal.ExecuteParallel(ctx, requestFunc, responseFunc); err != nil {
+	if err := signal.ExecuteParallel(ctx, requestFunc, functions.CloseOnSuccess(responseFunc, functions.Close(link.Writer))); err != nil {
 		return newError("connection ends").Base(err)
 	}
 

+ 2 - 1
proxy/vmess/outbound/outbound.go

@@ -11,6 +11,7 @@ import (
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/functions"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -159,7 +160,7 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
 		return buf.Copy(bodyReader, output, buf.UpdateActivity(timer))
 	}
 
-	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
+	if err := signal.ExecuteParallel(ctx, requestDone, functions.CloseOnSuccess(responseDone, functions.Close(output))); err != nil {
 		return newError("connection ends").Base(err)
 	}