Browse Source

rename CloseError() to Interrupt()

Darien Raymond 6 years ago
parent
commit
3de8389361

+ 2 - 3
app/commander/outbound.go

@@ -8,7 +8,6 @@ import (
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/signal/done"
 	"v2ray.com/core/common/signal/done"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 // OutboundListener is a net.Listener for listening gRPC connections.
 // OutboundListener is a net.Listener for listening gRPC connections.
@@ -73,8 +72,8 @@ func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
 	co.access.RLock()
 	co.access.RLock()
 
 
 	if co.closed {
 	if co.closed {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		co.access.RUnlock()
 		co.access.RUnlock()
 		return
 		return
 	}
 	}

+ 3 - 3
app/dispatcher/default.go

@@ -77,13 +77,13 @@ func (r *cachedReader) ReadMultiBufferTimeout(timeout time.Duration) (buf.MultiB
 	return r.reader.ReadMultiBufferTimeout(timeout)
 	return r.reader.ReadMultiBufferTimeout(timeout)
 }
 }
 
 
-func (r *cachedReader) CloseError() {
+func (r *cachedReader) Interrupt() {
 	r.Lock()
 	r.Lock()
 	if r.cache != nil {
 	if r.cache != nil {
 		r.cache = buf.ReleaseMulti(r.cache)
 		r.cache = buf.ReleaseMulti(r.cache)
 	}
 	}
 	r.Unlock()
 	r.Unlock()
-	r.reader.CloseError()
+	r.reader.Interrupt()
 }
 }
 
 
 // DefaultDispatcher is a default implementation of Dispatcher.
 // DefaultDispatcher is a default implementation of Dispatcher.
@@ -267,7 +267,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
 	if dispatcher == nil {
 	if dispatcher == nil {
 		newError("default outbound handler not exist").WriteToLog(session.ExportIDToError(ctx))
 		newError("default outbound handler not exist").WriteToLog(session.ExportIDToError(ctx))
 		common.Close(link.Writer)
 		common.Close(link.Writer)
-		pipe.CloseError(link.Reader)
+		common.Interrupt(link.Reader)
 		return
 		return
 	}
 	}
 
 

+ 2 - 3
app/dispatcher/stats.go

@@ -4,7 +4,6 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/features/stats"
 	"v2ray.com/core/features/stats"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 type SizeStatWriter struct {
 type SizeStatWriter struct {
@@ -21,6 +20,6 @@ func (w *SizeStatWriter) Close() error {
 	return common.Close(w.Writer)
 	return common.Close(w.Writer)
 }
 }
 
 
-func (w *SizeStatWriter) CloseError() {
-	pipe.CloseError(w.Writer)
+func (w *SizeStatWriter) Interrupt() {
+	common.Interrupt(w.Writer)
 }
 }

+ 3 - 3
app/proxyman/outbound/handler.go

@@ -100,17 +100,17 @@ func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
 	if h.mux != nil {
 	if h.mux != nil {
 		if err := h.mux.Dispatch(ctx, link); err != nil {
 		if err := h.mux.Dispatch(ctx, link); err != nil {
 			newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
 			newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
-			pipe.CloseError(link.Writer)
+			common.Interrupt(link.Writer)
 		}
 		}
 	} else {
 	} else {
 		if err := h.proxy.Process(ctx, link, h); err != nil {
 		if err := h.proxy.Process(ctx, link, h); err != nil {
 			// Ensure outbound ray is properly closed.
 			// Ensure outbound ray is properly closed.
 			newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
 			newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
-			pipe.CloseError(link.Writer)
+			common.Interrupt(link.Writer)
 		} else {
 		} else {
 			common.Must(common.Close(link.Writer))
 			common.Must(common.Close(link.Writer))
 		}
 		}
-		pipe.CloseError(link.Reader)
+		common.Interrupt(link.Reader)
 	}
 	}
 }
 }
 
 

+ 2 - 2
app/reverse/portal.go

@@ -97,7 +97,7 @@ func (o *Outbound) Tag() string {
 func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
 func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
 	if err := o.portal.HandleConnection(ctx, link); err != nil {
 	if err := o.portal.HandleConnection(ctx, link); err != nil {
 		newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
 		newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Writer)
 	}
 	}
 }
 }
 
 
@@ -244,7 +244,7 @@ func (w *PortalWorker) heartbeat() error {
 
 
 		defer func() {
 		defer func() {
 			common.Close(w.writer)
 			common.Close(w.writer)
-			pipe.CloseError(w.reader)
+			common.Interrupt(w.reader)
 			w.writer = nil
 			w.writer = nil
 		}()
 		}()
 	}
 	}

+ 14 - 0
common/interfaces.go

@@ -6,6 +6,11 @@ type Closable interface {
 	Close() error
 	Close() error
 }
 }
 
 
+// Interruptible is an interface for objects that can be stopped before its completion.
+type Interruptible interface {
+	Interrupt()
+}
+
 // Close closes the obj if it is a Closable.
 // Close closes the obj if it is a Closable.
 func Close(obj interface{}) error {
 func Close(obj interface{}) error {
 	if c, ok := obj.(Closable); ok {
 	if c, ok := obj.(Closable); ok {
@@ -14,6 +19,15 @@ func Close(obj interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
+// Interrupt calls Interrupt() if object implements Interruptible interface, or Close() if the object implements Closable interface.
+func Interrupt(obj interface{}) error {
+	if c, ok := obj.(Interruptible); ok {
+		c.Interrupt()
+		return nil
+	}
+	return Close(obj)
+}
+
 // Runnable is the interface for objects that can start to work and stop on demand.
 // Runnable is the interface for objects that can start to work and stop on demand.
 type Runnable interface {
 type Runnable interface {
 	// Start starts the runnable object. Upon the method returning nil, the object begins to function properly.
 	// Start starts the runnable object. Upon the method returning nil, the object begins to function properly.

+ 7 - 7
common/mux/client.go

@@ -213,8 +213,8 @@ func (m *ClientWorker) monitor() {
 		select {
 		select {
 		case <-m.done.Wait():
 		case <-m.done.Wait():
 			m.sessionManager.Close()
 			m.sessionManager.Close()
-			common.Close(m.link.Writer)    // nolint: errcheck
-			pipe.CloseError(m.link.Reader) // nolint: errcheck
+			common.Close(m.link.Writer)     // nolint: errcheck
+			common.Interrupt(m.link.Reader) // nolint: errcheck
 			return
 			return
 		case <-timer.C:
 		case <-timer.C:
 			size := m.sessionManager.Size()
 			size := m.sessionManager.Size()
@@ -253,14 +253,14 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 	if err := writeFirstPayload(s.input, writer); err != nil {
 	if err := writeFirstPayload(s.input, writer); err != nil {
 		newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
 		newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
 		writer.hasError = true
 		writer.hasError = true
-		pipe.CloseError(s.input)
+		common.Interrupt(s.input)
 		return
 		return
 	}
 	}
 
 
 	if err := buf.Copy(s.input, writer); err != nil {
 	if err := buf.Copy(s.input, writer); err != nil {
 		newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
 		newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
 		writer.hasError = true
 		writer.hasError = true
-		pipe.CloseError(s.input)
+		common.Interrupt(s.input)
 		return
 		return
 	}
 	}
 }
 }
@@ -339,7 +339,7 @@ func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 		closingWriter.Close()
 		closingWriter.Close()
 
 
 		drainErr := buf.Copy(rr, buf.Discard)
 		drainErr := buf.Copy(rr, buf.Discard)
-		pipe.CloseError(s.input)
+		common.Interrupt(s.input)
 		s.Close()
 		s.Close()
 		return drainErr
 		return drainErr
 	}
 	}
@@ -350,8 +350,8 @@ func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 		if meta.Option.Has(OptionError) {
 		if meta.Option.Has(OptionError) {
-			pipe.CloseError(s.input)
-			pipe.CloseError(s.output)
+			common.Interrupt(s.input)
+			common.Interrupt(s.output)
 		}
 		}
 		s.Close()
 		s.Close()
 	}
 	}

+ 6 - 5
common/mux/server.go

@@ -5,6 +5,7 @@ import (
 	"io"
 	"io"
 
 
 	"v2ray.com/core"
 	"v2ray.com/core"
+	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/log"
@@ -146,7 +147,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 	rr := s.NewReader(reader)
 	rr := s.NewReader(reader)
 	if err := buf.Copy(rr, s.output); err != nil {
 	if err := buf.Copy(rr, s.output); err != nil {
 		buf.Copy(rr, buf.Discard)
 		buf.Copy(rr, buf.Discard)
-		pipe.CloseError(s.input)
+		common.Interrupt(s.input)
 		return s.Close()
 		return s.Close()
 	}
 	}
 	return nil
 	return nil
@@ -177,7 +178,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 		closingWriter.Close()
 		closingWriter.Close()
 
 
 		drainErr := buf.Copy(rr, buf.Discard)
 		drainErr := buf.Copy(rr, buf.Discard)
-		pipe.CloseError(s.input)
+		common.Interrupt(s.input)
 		s.Close()
 		s.Close()
 		return drainErr
 		return drainErr
 	}
 	}
@@ -188,8 +189,8 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
 		if meta.Option.Has(OptionError) {
 		if meta.Option.Has(OptionError) {
-			pipe.CloseError(s.input)
-			pipe.CloseError(s.output)
+			common.Interrupt(s.input)
+			common.Interrupt(s.output)
 		}
 		}
 		s.Close()
 		s.Close()
 	}
 	}
@@ -241,7 +242,7 @@ func (w *ServerWorker) run(ctx context.Context) {
 			if err != nil {
 			if err != nil {
 				if errors.Cause(err) != io.EOF {
 				if errors.Cause(err) != io.EOF {
 					newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
 					newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
-					pipe.CloseError(input)
+					common.Interrupt(input)
 				}
 				}
 				return
 				return
 			}
 			}

+ 1 - 2
proxy/blackhole/blackhole.go

@@ -10,7 +10,6 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 // Handler is an outbound connection that silently swallow the entire payload.
 // Handler is an outbound connection that silently swallow the entire payload.
@@ -36,7 +35,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 		// Sleep a little here to make sure the response is sent to client.
 		// Sleep a little here to make sure the response is sent to client.
 		time.Sleep(time.Second)
 		time.Sleep(time.Second)
 	}
 	}
-	pipe.CloseError(link.Writer)
+	common.Interrupt(link.Writer)
 	return nil
 	return nil
 }
 }
 
 

+ 2 - 3
proxy/dokodemo/dokodemo.go

@@ -16,7 +16,6 @@ import (
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 func init() {
 func init() {
@@ -148,8 +147,8 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	}
 	}
 
 
 	if err := task.Run(ctx, task.OnSuccess(requestDone, task.Close(link.Writer)), responseDone); err != nil {
 	if err := task.Run(ctx, task.OnSuccess(requestDone, task.Close(link.Writer)), responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 5 - 5
proxy/http/server.go

@@ -22,7 +22,6 @@ import (
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 // Server is an HTTP proxy server.
 // Server is an HTTP proxy server.
@@ -51,6 +50,7 @@ func (s *Server) policy() policy.Session {
 	return p
 	return p
 }
 }
 
 
+// Network implements proxy.Inbound.
 func (*Server) Network() []net.Network {
 func (*Server) Network() []net.Network {
 	return []net.Network{net.Network_TCP}
 	return []net.Network{net.Network_TCP}
 }
 }
@@ -191,8 +191,8 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 
 
 	var closeWriter = task.OnSuccess(requestDone, task.Close(link.Writer))
 	var closeWriter = task.OnSuccess(requestDone, task.Close(link.Writer))
 	if err := task.Run(ctx, closeWriter, responseDone); err != nil {
 	if err := task.Run(ctx, closeWriter, responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 
@@ -287,8 +287,8 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
 	}
 	}
 
 
 	if err := task.Run(ctx, requestDone, responseDone); err != nil {
 	if err := task.Run(ctx, requestDone, responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 2 - 3
proxy/mtproto/server.go

@@ -17,7 +17,6 @@ import (
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/policy"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 var (
 var (
@@ -143,8 +142,8 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
 
 
 	var responseDoneAndCloseWriter = task.OnSuccess(response, task.Close(link.Writer))
 	var responseDoneAndCloseWriter = task.OnSuccess(response, task.Close(link.Writer))
 	if err := task.Run(ctx, request, responseDoneAndCloseWriter); err != nil {
 	if err := task.Run(ctx, request, responseDoneAndCloseWriter); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 2 - 3
proxy/shadowsocks/server.go

@@ -17,7 +17,6 @@ import (
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
 	"v2ray.com/core/transport/internet/udp"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 type Server struct {
 type Server struct {
@@ -231,8 +230,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 
 
 	var requestDoneAndCloseWriter = task.OnSuccess(requestDone, task.Close(link.Writer))
 	var requestDoneAndCloseWriter = task.OnSuccess(requestDone, task.Close(link.Writer))
 	if err := task.Run(ctx, requestDoneAndCloseWriter, responseDone); err != nil {
 	if err := task.Run(ctx, requestDoneAndCloseWriter, responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 2 - 3
proxy/socks/server.go

@@ -19,7 +19,6 @@ import (
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
 	"v2ray.com/core/transport/internet/udp"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 // Server is a SOCKS 5 proxy server
 // Server is a SOCKS 5 proxy server
@@ -166,8 +165,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 
 
 	var requestDonePost = task.OnSuccess(requestDone, task.Close(link.Writer))
 	var requestDonePost = task.OnSuccess(requestDone, task.Close(link.Writer))
 	if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
 	if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 2 - 3
proxy/vmess/inbound/inbound.go

@@ -26,7 +26,6 @@ import (
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
 )
 )
 
 
 type userByEmail struct {
 type userByEmail struct {
@@ -304,8 +303,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 
 
 	var requestDonePost = task.OnSuccess(requestDone, task.Close(link.Writer))
 	var requestDonePost = task.OnSuccess(requestDone, task.Close(link.Writer))
 	if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
 	if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
-		pipe.CloseError(link.Reader)
-		pipe.CloseError(link.Writer)
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
 		return newError("connection ends").Base(err)
 		return newError("connection ends").Base(err)
 	}
 	}
 
 

+ 1 - 1
testing/servers/tcp/tcp.go

@@ -81,7 +81,7 @@ func (server *Server) handleConnection(conn net.Conn) {
 			}
 			}
 		}
 		}
 	}, func() error {
 	}, func() error {
-		defer pReader.CloseError()
+		defer pReader.Interrupt()
 
 
 		w := buf.NewWriter(conn)
 		w := buf.NewWriter(conn)
 		for {
 		for {

+ 2 - 1
transport/pipe/impl.go

@@ -182,7 +182,8 @@ func (p *pipe) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-func (p *pipe) CloseError() {
+// Interrupt implements common.Interruptible.
+func (p *pipe) Interrupt() {
 	p.Lock()
 	p.Lock()
 	defer p.Unlock()
 	defer p.Unlock()
 
 

+ 0 - 11
transport/pipe/pipe.go

@@ -67,14 +67,3 @@ func New(opts ...Option) (*Reader, *Writer) {
 			pipe: p,
 			pipe: p,
 		}
 		}
 }
 }
-
-type closeError interface {
-	CloseError()
-}
-
-// CloseError invokes CloseError() method if the object is either Reader or Writer.
-func CloseError(v interface{}) {
-	if c, ok := v.(closeError); ok {
-		c.CloseError()
-	}
-}

+ 2 - 2
transport/pipe/pipe_test.go

@@ -32,7 +32,7 @@ func TestPipeReadWrite(t *testing.T) {
 	assert(rb.String(), Equals, "abcdefg")
 	assert(rb.String(), Equals, "abcdefg")
 }
 }
 
 
-func TestPipeCloseError(t *testing.T) {
+func TestPipeInterrupt(t *testing.T) {
 	assert := With(t)
 	assert := With(t)
 
 
 	pReader, pWriter := New(WithSizeLimit(1024))
 	pReader, pWriter := New(WithSizeLimit(1024))
@@ -40,7 +40,7 @@ func TestPipeCloseError(t *testing.T) {
 	b := buf.New()
 	b := buf.New()
 	b.Write(payload)
 	b.Write(payload)
 	assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
 	assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
-	pWriter.CloseError()
+	pWriter.Interrupt()
 
 
 	rb, err := pReader.ReadMultiBuffer()
 	rb, err := pReader.ReadMultiBuffer()
 	assert(err, Equals, io.ErrClosedPipe)
 	assert(err, Equals, io.ErrClosedPipe)

+ 3 - 3
transport/pipe/reader.go

@@ -21,7 +21,7 @@ func (r *Reader) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error
 	return r.pipe.ReadMultiBufferTimeout(d)
 	return r.pipe.ReadMultiBufferTimeout(d)
 }
 }
 
 
-// CloseError sets the pipe to error state. Both reading and writing from/to the pipe will return io.ErrClosedPipe.
-func (r *Reader) CloseError() {
-	r.pipe.CloseError()
+// Interrupt implements common.Interruptible.
+func (r *Reader) Interrupt() {
+	r.pipe.Interrupt()
 }
 }

+ 3 - 3
transport/pipe/writer.go

@@ -19,7 +19,7 @@ func (w *Writer) Close() error {
 	return w.pipe.Close()
 	return w.pipe.Close()
 }
 }
 
 
-// CloseError sets the pipe to error state. Both reading and writing from/to the pipe will return io.ErrClosedPipe.
-func (w *Writer) CloseError() {
-	w.pipe.CloseError()
+// Interrupt implements common.Interruptible.
+func (w *Writer) Interrupt() {
+	w.pipe.Interrupt()
 }
 }