소스 검색

able to close ray stream with error

Darien Raymond 8 년 전
부모
커밋
72992c7478

+ 1 - 1
app/dispatcher/impl/default.go

@@ -71,7 +71,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.I
 func (v *DefaultDispatcher) waitAndDispatch(wait func() error, destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
 	if err := wait(); err != nil {
 		log.Info("DefaultDispatcher: Failed precondition: ", err)
-		link.OutboundInput().ForceClose()
+		link.OutboundInput().CloseError()
 		link.OutboundOutput().Close()
 		return
 	}

+ 1 - 1
app/proxy/proxy.go

@@ -98,7 +98,7 @@ func (v *Connection) Write(b []byte) (int, error) {
 func (v *Connection) Close() error {
 	v.closed = true
 	v.stream.InboundInput().Close()
-	v.stream.InboundOutput().ForceClose()
+	v.stream.InboundOutput().CloseError()
 	return nil
 }
 

+ 1 - 1
proxy/blackhole/blackhole.go

@@ -31,7 +31,7 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 	v.response.WriteTo(ray.OutboundOutput())
 	ray.OutboundOutput().Close()
 
-	ray.OutboundInput().ForceClose()
+	ray.OutboundInput().CloseError()
 }
 
 // Factory is an utility for creating blackhole handlers.

+ 3 - 5
proxy/dokodemo/dokodemo.go

@@ -172,8 +172,6 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 		Destination: dest,
 		Inbound:     v.meta,
 	})
-	output := ray.InboundOutput()
-	defer output.ForceClose()
 
 	reader := v2net.NewTimeOutReader(v.config.Timeout, conn)
 
@@ -191,11 +189,9 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 	})
 
 	responseDone := signal.ExecuteAsync(func() error {
-		defer output.ForceClose()
-
 		v2writer := buf.NewWriter(conn)
 
-		if err := buf.PipeUntilEOF(output, v2writer); err != nil {
+		if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
 			log.Info("Dokodemo: Failed to transport all TCP response: ", err)
 			return err
 		}
@@ -203,6 +199,8 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 	})
 
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		ray.InboundInput().CloseError()
+		ray.InboundOutput().CloseError()
 		log.Info("Dokodemo: Connection ends with ", err)
 	}
 }

+ 2 - 4
proxy/freedom/freedom.go

@@ -72,8 +72,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 
 	input := ray.OutboundInput()
 	output := ray.OutboundOutput()
-	defer input.ForceClose()
-	defer output.Close()
 
 	var conn internet.Connection
 	if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
@@ -96,8 +94,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 	conn.SetReusable(false)
 
 	requestDone := signal.ExecuteAsync(func() error {
-		defer input.ForceClose()
-
 		v2writer := buf.NewWriter(conn)
 		if err := buf.PipeUntilEOF(input, v2writer); err != nil {
 			return err
@@ -127,6 +123,8 @@ func (v *Handler) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("Freedom: Connection ending with ", err)
+		input.CloseError()
+		output.CloseError()
 	}
 }
 

+ 7 - 8
proxy/http/server.go

@@ -176,8 +176,6 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
 	})
 
 	responseDone := signal.ExecuteAsync(func() error {
-		defer ray.InboundOutput().ForceClose()
-
 		v2writer := buf.NewWriter(writer)
 		if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
 			return err
@@ -185,7 +183,11 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
 		return nil
 	})
 
-	signal.ErrorOrFinish2(requestDone, responseDone)
+	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+		log.Info("HTTP|Server: Connection ends with: ", err)
+		ray.InboundInput().CloseError()
+		ray.InboundOutput().CloseError()
+	}
 }
 
 // @VisibleForTesting
@@ -244,9 +246,6 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 
-	defer input.Close()
-	defer output.ForceClose()
-
 	requestDone := signal.ExecuteAsync(func() error {
 		defer input.Close()
 
@@ -262,8 +261,6 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
 	})
 
 	responseDone := signal.ExecuteAsync(func() error {
-		defer output.ForceClose()
-
 		responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
 		response, err := http.ReadResponse(responseReader, request)
 		if err != nil {
@@ -283,6 +280,8 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
 
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("HTTP|Server: Connecton ending with ", err)
+		input.CloseError()
+		output.CloseError()
 	}
 }
 

+ 7 - 5
proxy/shadowsocks/client.go

@@ -96,8 +96,6 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 		bufferedWriter.SetBuffered(false)
 
 		requestDone := signal.ExecuteAsync(func() error {
-			defer ray.OutboundInput().ForceClose()
-
 			if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
 				return err
 			}
@@ -121,6 +119,8 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 
 		if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 			log.Info("Shadowsocks|Client: Connection ends with ", err)
+			ray.OutboundInput().CloseError()
+			ray.OutboundOutput().CloseError()
 		}
 	}
 
@@ -132,8 +132,6 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 		}
 
 		requestDone := signal.ExecuteAsync(func() error {
-			defer ray.OutboundInput().ForceClose()
-
 			if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
 				log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
 				return err
@@ -158,7 +156,11 @@ func (v *Client) Dispatch(destination v2net.Destination, ray ray.OutboundRay) {
 			return nil
 		})
 
-		signal.ErrorOrFinish2(requestDone, responseDone)
+		if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
+			log.Info("Shadowsocks|Client: Connection ends with ", err)
+			ray.OutboundInput().CloseError()
+			ray.OutboundOutput().CloseError()
+		}
 	}
 }
 

+ 2 - 4
proxy/shadowsocks/server.go

@@ -169,12 +169,8 @@ func (v *Server) handleConnection(conn internet.Connection) {
 		User:        request.User,
 		Inbound:     v.meta,
 	})
-	defer ray.InboundOutput().ForceClose()
-	defer ray.InboundInput().Close()
 
 	requestDone := signal.ExecuteAsync(func() error {
-		defer ray.InboundOutput().ForceClose()
-
 		bufferedWriter := bufio.NewWriter(conn)
 		responseWriter, err := WriteTCPResponse(request, bufferedWriter)
 		if err != nil {
@@ -215,6 +211,8 @@ func (v *Server) handleConnection(conn internet.Connection) {
 
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("Shadowsocks|Server: Connection ends with ", err)
+		ray.InboundInput().CloseError()
+		ray.InboundOutput().CloseError()
 	}
 }
 

+ 2 - 2
proxy/socks/client.go

@@ -80,7 +80,6 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) {
 	var responseFunc func() error
 	if request.Command == protocol.RequestCommandTCP {
 		requestFunc = func() error {
-			defer ray.OutboundInput().ForceClose()
 			return buf.PipeUntilEOF(ray.OutboundInput(), buf.NewWriter(conn))
 		}
 		responseFunc = func() error {
@@ -95,7 +94,6 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) {
 		}
 		defer udpConn.Close()
 		requestFunc = func() error {
-			defer ray.OutboundInput().ForceClose()
 			return buf.PipeUntilEOF(ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn})
 		}
 		responseFunc = func() error {
@@ -109,6 +107,8 @@ func (c *Client) Dispatch(destination net.Destination, ray ray.OutboundRay) {
 	responseDone := signal.ExecuteAsync(responseFunc)
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("Socks|Client: Connection ends with ", err)
+		ray.OutboundInput().CloseError()
+		ray.OutboundOutput().CloseError()
 	}
 }
 

+ 2 - 2
proxy/socks/server.go

@@ -165,8 +165,6 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
 	})
 
 	responseDone := signal.ExecuteAsync(func() error {
-		defer output.ForceClose()
-
 		v2writer := buf.NewWriter(writer)
 		if err := buf.PipeUntilEOF(output, v2writer); err != nil {
 			log.Info("Socks|Server: Failed to transport all TCP response: ", err)
@@ -178,6 +176,8 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
 
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("Socks|Server: Connection ends with ", err)
+		input.CloseError()
+		output.CloseError()
 	}
 }
 

+ 2 - 3
proxy/vmess/inbound/inbound.go

@@ -141,7 +141,6 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH
 }
 
 func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
-	defer input.ForceClose()
 	session.EncodeResponseHeader(response, output)
 
 	bodyWriter := session.EncodeResponseBody(request, output)
@@ -215,8 +214,6 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 	})
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
-	defer input.Close()
-	defer output.ForceClose()
 
 	userSettings := request.User.GetSettings()
 	connReader.SetTimeOut(userSettings.PayloadReadTimeout)
@@ -242,6 +239,8 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("VMess|Inbound: Connection ending with ", err)
 		connection.SetReusable(false)
+		input.CloseError()
+		output.CloseError()
 		return
 	}
 

+ 2 - 5
proxy/vmess/outbound/outbound.go

@@ -30,9 +30,6 @@ type VMessOutboundHandler struct {
 
 // Dispatch implements OutboundHandler.Dispatch().
 func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ray.OutboundRay) {
-	defer outboundRay.OutboundInput().ForceClose()
-	defer outboundRay.OutboundOutput().Close()
-
 	var rec *protocol.ServerSpec
 	var conn internet.Connection
 
@@ -85,8 +82,6 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ra
 	session := encoding.NewClientSession(protocol.DefaultIDHash)
 
 	requestDone := signal.ExecuteAsync(func() error {
-		defer input.ForceClose()
-
 		writer := bufio.NewWriter(conn)
 		session.EncodeRequestHeader(request, writer)
 
@@ -140,6 +135,8 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, outboundRay ra
 	if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
 		log.Info("VMess|Outbound: Connection ending with ", err)
 		conn.SetReusable(false)
+		input.CloseError()
+		output.CloseError()
 	}
 
 	return

+ 1 - 1
transport/internet/udp/udp_server.go

@@ -88,7 +88,7 @@ func (v *TimedInboundRay) Release() {
 	}
 	v.server = nil
 	v.inboundRay.InboundInput().Close()
-	v.inboundRay.InboundOutput().ForceClose()
+	v.inboundRay.InboundOutput().CloseError()
 	v.inboundRay = nil
 }
 

+ 17 - 17
transport/ray/direct.go

@@ -54,23 +54,23 @@ func (v *directRay) AddInspector(inspector Inspector) {
 
 type Stream struct {
 	buffer    chan *buf.Buffer
-	srcClose  chan bool
-	destClose chan bool
+	close     chan bool
+	err       chan bool
 	inspector *InspectorChain
 }
 
 func NewStream() *Stream {
 	return &Stream{
 		buffer:    make(chan *buf.Buffer, bufferSize),
-		srcClose:  make(chan bool),
-		destClose: make(chan bool),
+		close:     make(chan bool),
+		err:       make(chan bool),
 		inspector: &InspectorChain{},
 	}
 }
 
 func (v *Stream) Read() (*buf.Buffer, error) {
 	select {
-	case <-v.destClose:
+	case <-v.err:
 		return nil, io.ErrClosedPipe
 	case b := <-v.buffer:
 		return b, nil
@@ -78,9 +78,9 @@ func (v *Stream) Read() (*buf.Buffer, error) {
 		select {
 		case b := <-v.buffer:
 			return b, nil
-		case <-v.srcClose:
+		case <-v.close:
 			return nil, io.EOF
-		case <-v.destClose:
+		case <-v.err:
 			return nil, io.ErrClosedPipe
 		}
 	}
@@ -88,7 +88,7 @@ func (v *Stream) Read() (*buf.Buffer, error) {
 
 func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) {
 	select {
-	case <-v.destClose:
+	case <-v.err:
 		return nil, io.ErrClosedPipe
 	case b := <-v.buffer:
 		return b, nil
@@ -96,9 +96,9 @@ func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) {
 		select {
 		case b := <-v.buffer:
 			return b, nil
-		case <-v.srcClose:
+		case <-v.close:
 			return nil, io.EOF
-		case <-v.destClose:
+		case <-v.err:
 			return nil, io.ErrClosedPipe
 		case <-time.After(timeout):
 			return nil, ErrReadTimeout
@@ -112,15 +112,15 @@ func (v *Stream) Write(data *buf.Buffer) (err error) {
 	}
 
 	select {
-	case <-v.destClose:
+	case <-v.err:
 		return io.ErrClosedPipe
-	case <-v.srcClose:
+	case <-v.close:
 		return io.ErrClosedPipe
 	default:
 		select {
-		case <-v.destClose:
+		case <-v.err:
 			return io.ErrClosedPipe
-		case <-v.srcClose:
+		case <-v.close:
 			return io.ErrClosedPipe
 		case v.buffer <- data:
 			v.inspector.Input(data)
@@ -132,13 +132,13 @@ func (v *Stream) Write(data *buf.Buffer) (err error) {
 func (v *Stream) Close() {
 	defer swallowPanic()
 
-	close(v.srcClose)
+	close(v.close)
 }
 
-func (v *Stream) ForceClose() {
+func (v *Stream) CloseError() {
 	defer swallowPanic()
 
-	close(v.destClose)
+	close(v.err)
 	v.Close()
 
 	n := len(v.buffer)

+ 7 - 2
transport/ray/ray.go

@@ -35,13 +35,18 @@ type Ray interface {
 	AddInspector(Inspector)
 }
 
+type RayStream interface {
+	Close()
+	CloseError()
+}
+
 type InputStream interface {
 	buf.Reader
+	RayStream
 	ReadTimeout(time.Duration) (*buf.Buffer, error)
-	ForceClose()
 }
 
 type OutputStream interface {
 	buf.Writer
-	Close()
+	RayStream
 }