Darien Raymond 7 лет назад
Родитель
Сommit
5d23604713

+ 7 - 6
app/commander/outbound.go

@@ -2,12 +2,13 @@ package commander
 
 import (
 	"context"
-	"net"
 	"sync"
 
+	"v2ray.com/core"
 	"v2ray.com/core/common"
+	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/signal"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 // OutboundListener is a net.Listener for listening gRPC connections.
@@ -68,18 +69,18 @@ type Outbound struct {
 }
 
 // Dispatch implements core.OutboundHandler.
-func (co *Outbound) Dispatch(ctx context.Context, r ray.OutboundRay) {
+func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
 	co.access.RLock()
 
 	if co.closed {
-		r.OutboundInput().CloseError()
-		r.OutboundOutput().CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		co.access.RUnlock()
 		return
 	}
 
 	closeSignal := signal.NewNotifier()
-	c := ray.NewConnection(r.OutboundInput(), r.OutboundOutput(), ray.ConnCloseSignal(closeSignal))
+	c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal))
 	co.listener.add(c)
 	co.access.RUnlock()
 	<-closeSignal.Wait()

+ 68 - 17
app/dispatcher/default.go

@@ -12,14 +12,45 @@ import (
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/common/stats"
 	"v2ray.com/core/proxy"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 var (
 	errSniffingTimeout = newError("timeout on sniffing")
 )
 
+type cachedReader struct {
+	reader *pipe.Reader
+	cache  buf.MultiBuffer
+}
+
+func (r *cachedReader) Cache(b *buf.Buffer) {
+	mb, _ := r.reader.ReadMultiBufferWithTimeout(time.Millisecond * 100)
+	if !mb.IsEmpty() {
+		r.cache.WriteMultiBuffer(mb)
+	}
+	common.Must(b.Reset(func(x []byte) (int, error) {
+		return r.cache.Copy(x), nil
+	}))
+}
+
+func (r *cachedReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
+	if !r.cache.IsEmpty() {
+		mb := r.cache
+		r.cache = nil
+		return mb, nil
+	}
+
+	return r.reader.ReadMultiBuffer()
+}
+
+func (r *cachedReader) CloseError() {
+	r.cache.Release()
+	r.reader.CloseError()
+}
+
 // DefaultDispatcher is a default implementation of Dispatcher.
 type DefaultDispatcher struct {
 	ohm    core.OutboundHandlerManager
@@ -52,45 +83,64 @@ func (*DefaultDispatcher) Start() error {
 // Close implements common.Closable.
 func (*DefaultDispatcher) Close() error { return nil }
 
-func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option {
-	var rayOptions []ray.Option
+func (d *DefaultDispatcher) getLink(ctx context.Context) (*core.Link, *core.Link) {
+	uplinkReader, uplinkWriter := pipe.New()
+	downlinkReader, downlinkWriter := pipe.New()
 
+	inboundLink := &core.Link{
+		Reader: downlinkReader,
+		Writer: uplinkWriter,
+	}
+
+	outboundLink := &core.Link{
+		Reader: uplinkReader,
+		Writer: downlinkWriter,
+	}
+
+	user := protocol.UserFromContext(ctx)
 	if user != nil && len(user.Email) > 0 {
 		p := d.policy.ForLevel(user.Level)
 		if p.Stats.UserUplink {
 			name := "user>>>" + user.Email + ">>>traffic>>>uplink"
 			if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
-				rayOptions = append(rayOptions, ray.WithUplinkStatCounter(c))
+				inboundLink.Writer = &stats.SizeStatWriter{
+					Counter: c,
+					Writer:  inboundLink.Writer,
+				}
 			}
 		}
 		if p.Stats.UserDownlink {
 			name := "user>>>" + user.Email + ">>>traffic>>>downlink"
 			if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
-				rayOptions = append(rayOptions, ray.WithDownlinkStatCounter(c))
+				outboundLink.Writer = &stats.SizeStatWriter{
+					Counter: c,
+					Writer:  outboundLink.Writer,
+				}
 			}
 		}
 	}
 
-	return rayOptions
+	return inboundLink, outboundLink
 }
 
 // Dispatch implements core.Dispatcher.
-func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) {
+func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*core.Link, error) {
 	if !destination.IsValid() {
 		panic("Dispatcher: Invalid destination.")
 	}
 	ctx = proxy.ContextWithTarget(ctx, destination)
 
-	user := protocol.UserFromContext(ctx)
-	rayOptions := d.getRayOption(user)
-
-	outbound := ray.New(ctx, rayOptions...)
+	inbound, outbound := d.getLink(ctx)
 	snifferList := proxyman.ProtocolSniffersFromContext(ctx)
 	if destination.Address.Family().IsDomain() || len(snifferList) == 0 {
 		go d.routedDispatch(ctx, outbound, destination)
 	} else {
 		go func() {
-			domain, err := sniffer(ctx, snifferList, outbound)
+			cReader := &cachedReader{
+				reader: outbound.Reader.(*pipe.Reader),
+			}
+			outbound.Reader = cReader
+			domain, err := sniffer(ctx, snifferList, cReader)
 			if err == nil {
 				newError("sniffed domain: ", domain).WithContext(ctx).WriteToLog()
 				destination.Address = net.ParseAddress(domain)
@@ -99,10 +149,10 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
 			d.routedDispatch(ctx, outbound, destination)
 		}()
 	}
-	return outbound, nil
+	return inbound, nil
 }
 
-func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outbound ray.OutboundRay) (string, error) {
+func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, cReader *cachedReader) (string, error) {
 	payload := buf.New()
 	defer payload.Release()
 
@@ -117,7 +167,8 @@ func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outboun
 			if totalAttempt > 5 {
 				return "", errSniffingTimeout
 			}
-			outbound.OutboundInput().Peek(payload)
+
+			cReader.Cache(payload)
 			if !payload.IsEmpty() {
 				domain, err := sniffer.Sniff(payload.Bytes())
 				if err != ErrMoreData {
@@ -132,7 +183,7 @@ func sniffer(ctx context.Context, snifferList []proxyman.KnownProtocols, outboun
 	}
 }
 
-func (d *DefaultDispatcher) routedDispatch(ctx context.Context, outbound ray.OutboundRay, destination net.Destination) {
+func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, destination net.Destination) {
 	dispatcher := d.ohm.GetDefaultHandler()
 	if d.router != nil {
 		if tag, err := d.router.PickRoute(ctx); err == nil {
@@ -146,7 +197,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, outbound ray.Out
 			newError("default route for ", destination).WithContext(ctx).WriteToLog()
 		}
 	}
-	dispatcher.Dispatch(ctx, outbound)
+	dispatcher.Dispatch(ctx, link)
 }
 
 func init() {

+ 45 - 36
app/proxyman/mux/mux.go

@@ -18,7 +18,7 @@ import (
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 const (
@@ -41,12 +41,12 @@ func NewClientManager(p proxy.Outbound, d proxy.Dialer, c *proxyman.Multiplexing
 	}
 }
 
-func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) error {
+func (m *ClientManager) Dispatch(ctx context.Context, link *core.Link) error {
 	m.access.Lock()
 	defer m.access.Unlock()
 
 	for _, client := range m.clients {
-		if client.Dispatch(ctx, outboundRay) {
+		if client.Dispatch(ctx, link) {
 			return nil
 		}
 	}
@@ -56,7 +56,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRa
 		return newError("failed to create client").Base(err)
 	}
 	m.clients = append(m.clients, client)
-	client.Dispatch(ctx, outboundRay)
+	client.Dispatch(ctx, link)
 	return nil
 }
 
@@ -76,7 +76,7 @@ func (m *ClientManager) onClientFinish() {
 
 type Client struct {
 	sessionManager *SessionManager
-	inboundRay     ray.InboundRay
+	link           core.Link
 	done           *signal.Done
 	manager        *ClientManager
 	concurrency    uint32
@@ -89,18 +89,22 @@ var muxCoolPort = net.Port(9527)
 func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) {
 	ctx := proxy.ContextWithTarget(context.Background(), net.TCPDestination(muxCoolAddress, muxCoolPort))
 	ctx, cancel := context.WithCancel(ctx)
-	pipe := ray.New(ctx)
+	uplinkReader, upLinkWriter := pipe.New()
+	downlinkReader, downlinkWriter := pipe.New()
 
 	c := &Client{
 		sessionManager: NewSessionManager(),
-		inboundRay:     pipe,
-		done:           signal.NewDone(),
-		manager:        m,
-		concurrency:    m.config.Concurrency,
+		link: core.Link{
+			Reader: downlinkReader,
+			Writer: upLinkWriter,
+		},
+		done:        signal.NewDone(),
+		manager:     m,
+		concurrency: m.config.Concurrency,
 	}
 
 	go func() {
-		if err := p.Process(ctx, pipe, dialer); err != nil {
+		if err := p.Process(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
 			errors.New("failed to handler mux client connection").Base(err).WriteToLog()
 		}
 		c.done.Close()
@@ -127,8 +131,8 @@ func (m *Client) monitor() {
 		select {
 		case <-m.done.Wait():
 			m.sessionManager.Close()
-			m.inboundRay.InboundInput().Close()
-			m.inboundRay.InboundOutput().CloseError()
+			common.Close(m.link.Writer)
+			pipe.CloseError(m.link.Reader)
 			return
 		case <-timer.C:
 			size := m.sessionManager.Size()
@@ -159,7 +163,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 	writer.Close()
 }
 
-func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {
+func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool {
 	sm := m.sessionManager
 	if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
 		return false
@@ -173,9 +177,9 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 	if s == nil {
 		return false
 	}
-	s.input = outboundRay.OutboundInput()
-	s.output = outboundRay.OutboundOutput()
-	go fetchInput(ctx, s, m.inboundRay.InboundInput())
+	s.input = link.Reader
+	s.output = link.Writer
+	go fetchInput(ctx, s, m.link.Writer)
 	return true
 }
 
@@ -205,7 +209,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 		if err := buf.Copy(s.NewReader(reader), s.output); err != nil {
 			drain(reader)
-			s.input.CloseError()
+			pipe.CloseError(s.input)
 			return s.Close()
 		}
 		return nil
@@ -216,8 +220,8 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 		if meta.Option.Has(OptionError) {
-			s.input.CloseError()
-			s.output.CloseError()
+			pipe.CloseError(s.input)
+			pipe.CloseError(s.output)
 		}
 		s.Close()
 	}
@@ -230,7 +234,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
 func (m *Client) fetchOutput() {
 	defer m.done.Close()
 
-	reader := buf.NewBufferedReader(m.inboundRay.InboundOutput())
+	reader := buf.NewBufferedReader(m.link.Reader)
 
 	for {
 		meta, err := ReadMetadata(reader)
@@ -274,19 +278,24 @@ func NewServer(ctx context.Context) *Server {
 	return s
 }
 
-func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
+func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) {
 	if dest.Address != muxCoolAddress {
 		return s.dispatcher.Dispatch(ctx, dest)
 	}
 
-	ray := ray.New(ctx)
+	uplinkReader, uplinkWriter := pipe.New()
+	downlinkReader, downlinkWriter := pipe.New()
+
 	worker := &ServerWorker{
-		dispatcher:     s.dispatcher,
-		outboundRay:    ray,
+		dispatcher: s.dispatcher,
+		link: &core.Link{
+			Reader: uplinkReader,
+			Writer: downlinkWriter,
+		},
 		sessionManager: NewSessionManager(),
 	}
 	go worker.run(ctx)
-	return ray, nil
+	return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
 }
 
 func (s *Server) Start() error {
@@ -299,7 +308,7 @@ func (s *Server) Close() error {
 
 type ServerWorker struct {
 	dispatcher     core.Dispatcher
-	outboundRay    ray.OutboundRay
+	link           *core.Link
 	sessionManager *SessionManager
 }
 
@@ -334,7 +343,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 		}
 		log.Record(msg)
 	}
-	inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
+	link, err := w.dispatcher.Dispatch(ctx, meta.Target)
 	if err != nil {
 		if meta.Option.Has(OptionData) {
 			drain(reader)
@@ -342,8 +351,8 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 		return newError("failed to dispatch request.").Base(err)
 	}
 	s := &Session{
-		input:        inboundRay.InboundOutput(),
-		output:       inboundRay.InboundInput(),
+		input:        link.Reader,
+		output:       link.Writer,
 		parent:       w.sessionManager,
 		ID:           meta.SessionID,
 		transferType: protocol.TransferTypeStream,
@@ -352,7 +361,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 		s.transferType = protocol.TransferTypePacket
 	}
 	w.sessionManager.Add(s)
-	go handle(ctx, s, w.outboundRay.OutboundOutput())
+	go handle(ctx, s, w.link.Writer)
 	if meta.Option.Has(OptionData) {
 		return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError())
 	}
@@ -366,7 +375,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
 		if err := buf.Copy(s.NewReader(reader), s.output); err != nil {
 			drain(reader)
-			s.input.CloseError()
+			pipe.CloseError(s.input)
 			return s.Close()
 		}
 		return nil
@@ -377,8 +386,8 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
 		if meta.Option.Has(OptionError) {
-			s.input.CloseError()
-			s.output.CloseError()
+			pipe.CloseError(s.input)
+			pipe.CloseError(s.output)
 		}
 		s.Close()
 	}
@@ -414,7 +423,7 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
 }
 
 func (w *ServerWorker) run(ctx context.Context) {
-	input := w.outboundRay.OutboundInput()
+	input := w.link.Reader
 	reader := buf.NewBufferedReader(input)
 
 	defer w.sessionManager.Close()
@@ -428,7 +437,7 @@ func (w *ServerWorker) run(ctx context.Context) {
 			if err != nil {
 				if errors.Cause(err) != io.EOF {
 					newError("unexpected EOF").Base(err).WithContext(ctx).WriteToLog()
-					input.CloseError()
+					pipe.CloseError(input)
 				}
 				return
 			}

+ 7 - 7
app/proxyman/mux/session.go

@@ -3,9 +3,9 @@ package mux
 import (
 	"sync"
 
+	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/protocol"
-	"v2ray.com/core/transport/ray"
 )
 
 type SessionManager struct {
@@ -118,8 +118,8 @@ func (m *SessionManager) Close() error {
 	m.closed = true
 
 	for _, s := range m.sessions {
-		s.input.Close()
-		s.output.Close()
+		common.Close(s.input)
+		common.Close(s.output)
 	}
 
 	m.sessions = nil
@@ -128,8 +128,8 @@ func (m *SessionManager) Close() error {
 
 // Session represents a client connection in a Mux connection.
 type Session struct {
-	input        ray.InputStream
-	output       ray.OutputStream
+	input        buf.Reader
+	output       buf.Writer
 	parent       *SessionManager
 	ID           uint16
 	transferType protocol.TransferType
@@ -137,8 +137,8 @@ type Session struct {
 
 // Close closes all resources associated with this session.
 func (s *Session) Close() error {
-	s.output.Close()
-	s.input.Close()
+	common.Close(s.output)
+	common.Close(s.input)
 	s.parent.Remove(s.ID)
 	return nil
 }

+ 14 - 11
app/proxyman/outbound/handler.go

@@ -10,7 +10,7 @@ import (
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 type Handler struct {
@@ -74,21 +74,21 @@ func (h *Handler) Tag() string {
 }
 
 // Dispatch implements proxy.Outbound.Dispatch.
-func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
+func (h *Handler) Dispatch(ctx context.Context, link *core.Link) {
 	if h.mux != nil {
-		if err := h.mux.Dispatch(ctx, outboundRay); err != nil {
+		if err := h.mux.Dispatch(ctx, link); err != nil {
 			newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog()
-			outboundRay.OutboundOutput().CloseError()
+			pipe.CloseError(link.Writer)
 		}
 	} else {
-		if err := h.proxy.Process(ctx, outboundRay, h); err != nil {
+		if err := h.proxy.Process(ctx, link, h); err != nil {
 			// Ensure outbound ray is properly closed.
 			newError("failed to process outbound traffic").Base(err).WithContext(ctx).WriteToLog()
-			outboundRay.OutboundOutput().CloseError()
+			pipe.CloseError(link.Writer)
 		} else {
-			outboundRay.OutboundOutput().Close()
+			common.Close(link.Writer)
 		}
-		outboundRay.OutboundInput().CloseError()
+		pipe.CloseError(link.Reader)
 	}
 }
 
@@ -101,9 +101,12 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
 			if handler != nil {
 				newError("proxying to ", tag, " for dest ", dest).AtDebug().WithContext(ctx).WriteToLog()
 				ctx = proxy.ContextWithTarget(ctx, dest)
-				stream := ray.New(ctx)
-				go handler.Dispatch(ctx, stream)
-				return ray.NewConnection(stream.InboundOutput(), stream.InboundInput()), nil
+
+				uplinkReader, uplinkWriter := pipe.New()
+				downlinkReader, downlinkWriter := pipe.New()
+
+				go handler.Dispatch(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter})
+				return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil
 			}
 
 			newError("failed to get outbound handler with tag: ", tag).AtWarning().WithContext(ctx).WriteToLog()

+ 6 - 0
common/buf/multi_buffer.go

@@ -172,6 +172,12 @@ func (mb *MultiBuffer) Write(b []byte) (int, error) {
 	return totalBytes, nil
 }
 
+// WriteMultiBuffer implements Writer.
+func (mb *MultiBuffer) WriteMultiBuffer(b MultiBuffer) error {
+	*mb = append(*mb, b...)
+	return nil
+}
+
 // Len returns the total number of bytes in the MultiBuffer.
 func (mb MultiBuffer) Len() int32 {
 	size := int32(0)

+ 53 - 40
transport/ray/connection.go → common/net/connection.go

@@ -1,69 +1,81 @@
-package ray
+package net
 
 import (
 	"io"
 	"net"
 	"time"
 
+	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/signal"
 )
 
 type ConnectionOption func(*connection)
 
-func ConnLocalAddr(addr net.Addr) ConnectionOption {
+func ConnectionLocalAddr(a net.Addr) ConnectionOption {
 	return func(c *connection) {
-		c.localAddr = addr
+		c.local = a
 	}
 }
 
-func ConnRemoteAddr(addr net.Addr) ConnectionOption {
+func ConnectionRemoteAddr(a net.Addr) ConnectionOption {
 	return func(c *connection) {
-		c.remoteAddr = addr
+		c.remote = a
 	}
 }
 
-func ConnCloseSignal(s *signal.Notifier) ConnectionOption {
+func ConnectionInput(writer io.Writer) ConnectionOption {
 	return func(c *connection) {
-		c.closeSignal = s
+		c.writer = buf.NewWriter(writer)
 	}
 }
 
-type connection struct {
-	input       InputStream
-	output      OutputStream
-	closed      bool
-	localAddr   net.Addr
-	remoteAddr  net.Addr
-	closeSignal *signal.Notifier
+func ConnectionInputMulti(writer buf.Writer) ConnectionOption {
+	return func(c *connection) {
+		c.writer = writer
+	}
+}
 
-	reader *buf.BufferedReader
+func ConnectionOutput(reader io.Reader) ConnectionOption {
+	return func(c *connection) {
+		c.reader = buf.NewBufferedReader(buf.NewReader(reader))
+	}
+}
+
+func ConnectionOutputMulti(reader buf.Reader) ConnectionOption {
+	return func(c *connection) {
+		c.reader = buf.NewBufferedReader(reader)
+	}
 }
 
-var zeroAddr net.Addr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}}
+func ConnectionOnClose(s *signal.Notifier) ConnectionOption {
+	return func(c *connection) {
+		c.onClose = s
+	}
+}
 
-// NewConnection wraps a Ray into net.Conn.
-func NewConnection(input InputStream, output OutputStream, options ...ConnectionOption) net.Conn {
+func NewConnection(opts ...ConnectionOption) net.Conn {
 	c := &connection{
-		input:      input,
-		output:     output,
-		localAddr:  zeroAddr,
-		remoteAddr: zeroAddr,
-		reader:     buf.NewBufferedReader(input),
+		done: signal.NewDone(),
 	}
 
-	for _, opt := range options {
+	for _, opt := range opts {
 		opt(c)
 	}
 
 	return c
 }
 
-// Read implements net.Conn.Read().
+type connection struct {
+	reader  *buf.BufferedReader
+	writer  buf.Writer
+	done    *signal.Done
+	onClose *signal.Notifier
+	local   Addr
+	remote  Addr
+}
+
 func (c *connection) Read(b []byte) (int, error) {
-	if c.closed {
-		return 0, io.EOF
-	}
 	return c.reader.Read(b)
 }
 
@@ -74,43 +86,44 @@ func (c *connection) ReadMultiBuffer() (buf.MultiBuffer, error) {
 
 // Write implements net.Conn.Write().
 func (c *connection) Write(b []byte) (int, error) {
-	if c.closed {
+	if c.done.Done() {
 		return 0, io.ErrClosedPipe
 	}
 
 	l := len(b)
 	mb := buf.NewMultiBufferCap(int32(l)/buf.Size + 1)
-	mb.Write(b)
-	return l, c.output.WriteMultiBuffer(mb)
+	common.Must2(mb.Write(b))
+	return l, c.writer.WriteMultiBuffer(mb)
 }
 
 func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
-	if c.closed {
+	if c.done.Done() {
 		return io.ErrClosedPipe
 	}
 
-	return c.output.WriteMultiBuffer(mb)
+	return c.writer.WriteMultiBuffer(mb)
 }
 
 // Close implements net.Conn.Close().
 func (c *connection) Close() error {
-	c.closed = true
-	c.output.Close()
-	c.input.CloseError()
-	if c.closeSignal != nil {
-		c.closeSignal.Signal()
+	common.Must(c.done.Close())
+	common.Close(c.reader)
+	common.Close(c.writer)
+	if c.onClose != nil {
+		c.onClose.Signal()
 	}
+
 	return nil
 }
 
 // LocalAddr implements net.Conn.LocalAddr().
 func (c *connection) LocalAddr() net.Addr {
-	return c.localAddr
+	return c.local
 }
 
 // RemoteAddr implements net.Conn.RemoteAddr().
 func (c *connection) RemoteAddr() net.Addr {
-	return c.remoteAddr
+	return c.remote
 }
 
 // SetDeadline implements net.Conn.SetDeadline().

+ 26 - 0
common/stats/io.go

@@ -0,0 +1,26 @@
+package stats
+
+import (
+	"v2ray.com/core"
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/buf"
+	"v2ray.com/core/transport/pipe"
+)
+
+type SizeStatWriter struct {
+	Counter core.StatCounter
+	Writer  buf.Writer
+}
+
+func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	w.Counter.Add(int64(mb.Len()))
+	return w.Writer.WriteMultiBuffer(mb)
+}
+
+func (w *SizeStatWriter) Close() error {
+	return common.Close(w.Writer)
+}
+
+func (w *SizeStatWriter) CloseError() {
+	pipe.CloseError(w.Writer)
+}

+ 1 - 2
dial.go

@@ -4,7 +4,6 @@ import (
 	"context"
 
 	"v2ray.com/core/common/net"
-	"v2ray.com/core/transport/ray"
 )
 
 // Dial provides an easy way for upstream caller to create net.Conn through V2Ray.
@@ -16,5 +15,5 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err
 	if err != nil {
 		return nil, err
 	}
-	return ray.NewConnection(r.InboundOutput(), r.InboundInput()), nil
+	return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil
 }

+ 1 - 2
network.go

@@ -6,7 +6,6 @@ import (
 
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/net"
-	"v2ray.com/core/transport/ray"
 )
 
 // InboundHandler is the interface for handlers that process inbound connections.
@@ -23,7 +22,7 @@ type InboundHandler interface {
 type OutboundHandler interface {
 	common.Runnable
 	Tag() string
-	Dispatch(ctx context.Context, outboundRay ray.OutboundRay)
+	Dispatch(ctx context.Context, link *Link)
 }
 
 // InboundHandlerManager is a feature that manages InboundHandlers.

+ 5 - 4
proxy/blackhole/blackhole.go

@@ -7,9 +7,10 @@ import (
 	"context"
 	"time"
 
+	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/proxy"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 // Handler is an outbound connection that silently swallow the entire payload.
@@ -29,11 +30,11 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 }
 
 // Process implements OutboundHandler.Dispatch().
-func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
-	h.response.WriteTo(outboundRay.OutboundOutput())
+func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error {
+	h.response.WriteTo(link.Writer)
 	// Sleep a little here to make sure the response is sent to client.
 	time.Sleep(time.Second)
-	outboundRay.OutboundOutput().CloseError()
+	pipe.CloseError(link.Writer)
 	return nil
 }
 

+ 7 - 6
proxy/dokodemo/dokodemo.go

@@ -14,6 +14,7 @@ import (
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
+	"v2ray.com/core/transport/pipe"
 )
 
 type DokodemoDoor struct {
@@ -70,18 +71,18 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, d.policy().Timeouts.ConnectionIdle)
 
-	inboundRay, err := dispatcher.Dispatch(ctx, dest)
+	link, err := dispatcher.Dispatch(ctx, dest)
 	if err != nil {
 		return newError("failed to dispatch request").Base(err)
 	}
 
 	requestDone := func() error {
-		defer inboundRay.InboundInput().Close()
+		defer common.Close(link.Writer)
 		defer timer.SetTimeout(d.policy().Timeouts.DownlinkOnly)
 
 		chunkReader := buf.NewReader(conn)
 
-		if err := buf.Copy(chunkReader, inboundRay.InboundInput(), buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(chunkReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport request").Base(err)
 		}
 
@@ -108,7 +109,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 			}
 		}
 
-		if err := buf.Copy(inboundRay.InboundOutput(), writer, buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport response").Base(err)
 		}
 
@@ -116,8 +117,8 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		inboundRay.InboundInput().CloseError()
-		inboundRay.InboundOutput().CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 

+ 3 - 6
proxy/freedom/freedom.go

@@ -15,7 +15,6 @@ import (
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // Handler handles Freedom connections.
@@ -65,7 +64,7 @@ func (h *Handler) resolveIP(ctx context.Context, domain string) net.Address {
 }
 
 // Process implements proxy.Outbound.
-func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
+func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error {
 	destination, _ := proxy.TargetFromContext(ctx)
 	if h.config.DestinationOverride != nil {
 		server := h.config.DestinationOverride.Server
@@ -77,8 +76,8 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 	}
 	newError("opening connection to ", destination).WithContext(ctx).WriteToLog()
 
-	input := outboundRay.OutboundInput()
-	output := outboundRay.OutboundOutput()
+	input := link.Reader
+	output := link.Writer
 
 	if h.config.DomainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
 		ip := h.resolveIP(ctx, destination.Address.Domain())
@@ -137,8 +136,6 @@ func (h *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		input.CloseError()
-		output.CloseError()
 		return newError("connection ends").Base(err)
 	}
 

+ 15 - 15
proxy/http/server.go

@@ -10,6 +10,8 @@ import (
 	"strings"
 	"time"
 
+	"v2ray.com/core/transport/pipe"
+
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
@@ -166,7 +168,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle)
-	ray, err := dispatcher.Dispatch(ctx, dest)
+	link, err := dispatcher.Dispatch(ctx, dest)
 	if err != nil {
 		return err
 	}
@@ -176,25 +178,25 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 		if err != nil {
 			return err
 		}
-		if err := ray.InboundInput().WriteMultiBuffer(payload); err != nil {
+		if err := link.Writer.WriteMultiBuffer(payload); err != nil {
 			return err
 		}
 		reader = nil
 	}
 
 	requestDone := func() error {
-		defer ray.InboundInput().Close()
+		defer common.Close(link.Writer)
 		defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly)
 
 		v2reader := buf.NewReader(conn)
-		return buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer))
+		return buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer))
 	}
 
 	responseDone := func() error {
 		defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly)
 
 		v2writer := buf.NewWriter(conn)
-		if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil {
 			return err
 		}
 
@@ -202,8 +204,8 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		ray.InboundInput().CloseError()
-		ray.InboundOutput().CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 
@@ -241,20 +243,18 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
 		request.Header.Set("User-Agent", "")
 	}
 
-	ray, err := dispatcher.Dispatch(ctx, dest)
+	link, err := dispatcher.Dispatch(ctx, dest)
 	if err != nil {
 		return err
 	}
-	input := ray.InboundInput()
-	output := ray.InboundOutput()
-	defer input.Close()
 
 	var result error = errWaitAnother
 
 	requestDone := func() error {
+		defer common.Close(link.Writer)
 		request.Header.Set("Connection", "close")
 
-		requestWriter := buf.NewBufferedWriter(ray.InboundInput())
+		requestWriter := buf.NewBufferedWriter(link.Writer)
 		common.Must(requestWriter.SetBuffered(false))
 		if err := request.Write(requestWriter); err != nil {
 			return newError("failed to write whole request").Base(err).AtWarning()
@@ -263,7 +263,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
 	}
 
 	responseDone := func() error {
-		responseReader := bufio.NewReaderSize(buf.NewBufferedReader(ray.InboundOutput()), buf.Size)
+		responseReader := bufio.NewReaderSize(buf.NewBufferedReader(link.Reader), buf.Size)
 		response, err := http.ReadResponse(responseReader, request)
 		if err == nil {
 			http_proto.RemoveHopByHopHeaders(response.Header)
@@ -299,8 +299,8 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		input.CloseError()
-		output.CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 

+ 1 - 2
proxy/proxy.go

@@ -12,7 +12,6 @@ import (
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // An Inbound processes inbound connections.
@@ -27,7 +26,7 @@ type Inbound interface {
 // An Outbound process outbound connections.
 type Outbound interface {
 	// Process processes the given connection. The given dialer may be used to dial a system outbound connection.
-	Process(context.Context, ray.OutboundRay, Dialer) error
+	Process(context.Context, *core.Link, Dialer) error
 }
 
 // Dialer is used by OutboundHandler for creating outbound connections.

+ 5 - 6
proxy/shadowsocks/client.go

@@ -12,7 +12,6 @@ import (
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // Client is a inbound handler for Shadowsocks protocol
@@ -38,7 +37,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
 }
 
 // Process implements OutboundHandler.Process().
-func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
+func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error {
 	destination, ok := proxy.TargetFromContext(ctx)
 	if !ok {
 		return newError("target not specified")
@@ -107,7 +106,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 
 		requestDone := func() error {
 			defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
-			return buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer))
+			return buf.Copy(link.Reader, bodyWriter, buf.UpdateActivity(timer))
 		}
 
 		responseDone := func() error {
@@ -118,7 +117,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 				return err
 			}
 
-			return buf.Copy(responseReader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer))
+			return buf.Copy(responseReader, link.Writer, buf.UpdateActivity(timer))
 		}
 
 		if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
@@ -138,7 +137,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 		requestDone := func() error {
 			defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
 
-			if err := buf.Copy(outboundRay.OutboundInput(), writer, buf.UpdateActivity(timer)); err != nil {
+			if err := buf.Copy(link.Reader, writer, buf.UpdateActivity(timer)); err != nil {
 				return newError("failed to transport all UDP request").Base(err)
 			}
 			return nil
@@ -152,7 +151,7 @@ func (c *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 				User:   user,
 			}
 
-			if err := buf.Copy(reader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil {
+			if err := buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)); err != nil {
 				return newError("failed to transport all UDP response").Base(err)
 			}
 			return nil

+ 8 - 7
proxy/shadowsocks/server.go

@@ -14,6 +14,7 @@ import (
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
+	"v2ray.com/core/transport/pipe"
 )
 
 type Server struct {
@@ -167,7 +168,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
-	ray, err := dispatcher.Dispatch(ctx, dest)
+	link, err := dispatcher.Dispatch(ctx, dest)
 	if err != nil {
 		return err
 	}
@@ -182,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 		}
 
 		{
-			payload, err := ray.InboundOutput().ReadMultiBuffer()
+			payload, err := link.Reader.ReadMultiBuffer()
 			if err != nil {
 				return err
 			}
@@ -195,7 +196,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 			return err
 		}
 
-		if err := buf.Copy(ray.InboundOutput(), responseWriter, buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(link.Reader, responseWriter, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP response").Base(err)
 		}
 
@@ -204,9 +205,9 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 
 	requestDone := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
-		defer ray.InboundInput().Close()
+		defer common.Close(link.Writer)
 
-		if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(bodyReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP request").Base(err)
 		}
 
@@ -214,8 +215,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		ray.InboundInput().CloseError()
-		ray.InboundOutput().CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 

+ 5 - 6
proxy/socks/client.go

@@ -13,7 +13,6 @@ import (
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // Client is a Socks5 client.
@@ -40,7 +39,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
 }
 
 // Process implements proxy.Outbound.Process.
-func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.Dialer) error {
+func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error {
 	destination, ok := proxy.TargetFromContext(ctx)
 	if !ok {
 		return newError("target not specified.")
@@ -107,11 +106,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 	if request.Command == protocol.RequestCommandTCP {
 		requestFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.DownlinkOnly)
-			return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer))
+			return buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer))
 		}
 		responseFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.UplinkOnly)
-			return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer))
+			return buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer))
 		}
 	} else if request.Command == protocol.RequestCommandUDP {
 		udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
@@ -121,12 +120,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 		defer udpConn.Close()
 		requestFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.DownlinkOnly)
-			return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer))
+			return buf.Copy(link.Reader, buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer))
 		}
 		responseFunc = func() error {
 			defer timer.SetTimeout(p.Timeouts.UplinkOnly)
 			reader := &UDPReader{reader: udpConn}
-			return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer))
+			return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer))
 		}
 	}
 

+ 7 - 9
proxy/socks/server.go

@@ -15,6 +15,7 @@ import (
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/udp"
+	"v2ray.com/core/transport/pipe"
 )
 
 // Server is a SOCKS 5 proxy server
@@ -129,20 +130,17 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle)
 
-	ray, err := dispatcher.Dispatch(ctx, dest)
+	link, err := dispatcher.Dispatch(ctx, dest)
 	if err != nil {
 		return err
 	}
 
-	input := ray.InboundInput()
-	output := ray.InboundOutput()
-
 	requestDone := func() error {
 		defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly)
-		defer input.Close()
+		defer common.Close(link.Writer)
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP request").Base(err)
 		}
 
@@ -153,7 +151,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 		defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly)
 
 		v2writer := buf.NewWriter(writer)
-		if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil {
+		if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP response").Base(err)
 		}
 
@@ -161,8 +159,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		input.CloseError()
-		output.CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 

+ 8 - 11
proxy/vmess/inbound/inbound.go

@@ -22,7 +22,7 @@ import (
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 )
 
 type userByEmail struct {
@@ -167,8 +167,8 @@ func (h *Handler) RemoveUser(ctx context.Context, email string) error {
 	return nil
 }
 
-func transferRequest(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error {
-	defer output.Close()
+func transferRequest(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output buf.Writer) error {
+	defer common.Close(output)
 
 	bodyReader := session.DecodeRequestBody(request, input)
 	if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil {
@@ -272,17 +272,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
-	ray, err := dispatcher.Dispatch(ctx, request.Destination())
+	link, err := dispatcher.Dispatch(ctx, request.Destination())
 	if err != nil {
 		return newError("failed to dispatch request to ", request.Destination()).Base(err)
 	}
 
-	input := ray.InboundInput()
-	output := ray.InboundOutput()
-
 	requestDone := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
-		return transferRequest(timer, session, request, reader, input)
+		return transferRequest(timer, session, request, reader, link.Writer)
 	}
 
 	responseDone := func() error {
@@ -293,12 +290,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 		response := &protocol.ResponseHeader{
 			Command: h.generateCommand(ctx, request),
 		}
-		return transferResponse(timer, session, request, response, output, writer)
+		return transferResponse(timer, session, request, response, link.Reader, writer)
 	}
 
 	if err := signal.ExecuteParallel(ctx, requestDone, responseDone); err != nil {
-		input.CloseError()
-		output.CloseError()
+		pipe.CloseError(link.Reader)
+		pipe.CloseError(link.Writer)
 		return newError("connection ends").Base(err)
 	}
 

+ 7 - 6
proxy/vmess/outbound/outbound.go

@@ -6,6 +6,8 @@ import (
 	"context"
 	"time"
 
+	"v2ray.com/core/transport/pipe"
+
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
@@ -17,7 +19,6 @@ import (
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/ray"
 )
 
 // Handler is an outbound connection handler for VMess protocol.
@@ -42,7 +43,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 }
 
 // Process implements proxy.Outbound.Process().
-func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
+func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error {
 	var rec *protocol.ServerSpec
 	var conn internet.Connection
 
@@ -95,8 +96,8 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 		request.Option.Set(protocol.RequestOptionChunkMasking)
 	}
 
-	input := outboundRay.OutboundInput()
-	output := outboundRay.OutboundOutput()
+	input := link.Reader
+	output := link.Writer
 
 	session := encoding.NewClientSession(protocol.DefaultIDHash)
 	sessionPolicy := v.v.PolicyManager().ForLevel(request.User.Level)
@@ -113,8 +114,8 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 		}
 
 		bodyWriter := session.EncodeRequestBody(request, writer)
-		{
-			firstPayload, err := input.ReadTimeout(time.Millisecond * 500)
+		if tReader, ok := input.(*pipe.Reader); ok {
+			firstPayload, err := tReader.ReadMultiBufferWithTimeout(time.Millisecond * 500)
 			if err != nil && err != buf.ErrReadTimeout {
 				return newError("failed to get first payload").Base(err)
 			}

+ 9 - 3
router.go

@@ -5,18 +5,24 @@ import (
 	"sync"
 
 	"v2ray.com/core/common"
+	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/net"
-	"v2ray.com/core/transport/ray"
 )
 
+// Link is a utility for connecting between an inbound and an outbound proxy handler.
+type Link struct {
+	Reader buf.Reader
+	Writer buf.Writer
+}
+
 // Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules.
 // Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly.
 type Dispatcher interface {
 	Feature
 
 	// Dispatch returns a Ray for transporting data for the given request.
-	Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error)
+	Dispatch(ctx context.Context, dest net.Destination) (*Link, error)
 }
 
 type syncDispatcher struct {
@@ -24,7 +30,7 @@ type syncDispatcher struct {
 	Dispatcher
 }
 
-func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
+func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*Link, error) {
 	d.RLock()
 	defer d.RUnlock()
 

+ 12 - 12
transport/internet/udp/dispatcher.go

@@ -6,18 +6,18 @@ import (
 	"time"
 
 	"v2ray.com/core"
+	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/signal"
-	"v2ray.com/core/transport/ray"
 )
 
 type ResponseCallback func(payload *buf.Buffer)
 
 type connEntry struct {
-	inbound ray.InboundRay
-	timer   signal.ActivityUpdater
-	cancel  context.CancelFunc
+	link   *core.Link
+	timer  signal.ActivityUpdater
+	cancel context.CancelFunc
 }
 
 type Dispatcher struct {
@@ -37,8 +37,8 @@ func (v *Dispatcher) RemoveRay(dest net.Destination) {
 	v.Lock()
 	defer v.Unlock()
 	if conn, found := v.conns[dest]; found {
-		conn.inbound.InboundInput().Close()
-		conn.inbound.InboundOutput().Close()
+		common.Close(conn.link.Reader)
+		common.Close(conn.link.Writer)
 		delete(v.conns, dest)
 	}
 }
@@ -59,11 +59,11 @@ func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallba
 		v.RemoveRay(dest)
 	}
 	timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
-	inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
+	link, _ := v.dispatcher.Dispatch(ctx, dest)
 	entry := &connEntry{
-		inbound: inboundRay,
-		timer:   timer,
-		cancel:  removeRay,
+		link:   link,
+		timer:  timer,
+		cancel: removeRay,
 	}
 	v.conns[dest] = entry
 	go handleInput(ctx, entry, callback)
@@ -75,7 +75,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
 	newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog()
 
 	conn := v.getInboundRay(destination, callback)
-	outputStream := conn.inbound.InboundInput()
+	outputStream := conn.link.Writer
 	if outputStream != nil {
 		if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
 			newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog()
@@ -86,7 +86,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
 }
 
 func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
-	input := conn.inbound.InboundOutput()
+	input := conn.link.Reader
 	timer := conn.timer
 
 	for {

+ 11 - 8
transport/internet/udp/dispatcher_test.go

@@ -6,18 +6,19 @@ import (
 	"testing"
 	"time"
 
+	"v2ray.com/core"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	. "v2ray.com/core/transport/internet/udp"
-	"v2ray.com/core/transport/ray"
+	"v2ray.com/core/transport/pipe"
 	. "v2ray.com/ext/assert"
 )
 
 type TestDispatcher struct {
-	OnDispatch func(ctx context.Context, dest net.Destination) (ray.InboundRay, error)
+	OnDispatch func(ctx context.Context, dest net.Destination) (*core.Link, error)
 }
 
-func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
+func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) {
 	return d.OnDispatch(ctx, dest)
 }
 
@@ -33,23 +34,25 @@ func TestSameDestinationDispatching(t *testing.T) {
 	assert := With(t)
 
 	ctx, cancel := context.WithCancel(context.Background())
-	link := ray.New(ctx)
+	uplinkReader, uplinkWriter := pipe.New()
+	downlinkReader, downlinkWriter := pipe.New()
+
 	go func() {
 		for {
-			data, err := link.OutboundInput().ReadMultiBuffer()
+			data, err := uplinkReader.ReadMultiBuffer()
 			if err != nil {
 				break
 			}
-			err = link.OutboundOutput().WriteMultiBuffer(data)
+			err = downlinkWriter.WriteMultiBuffer(data)
 			assert(err, IsNil)
 		}
 	}()
 
 	var count uint32
 	td := &TestDispatcher{
-		OnDispatch: func(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
+		OnDispatch: func(ctx context.Context, dest net.Destination) (*core.Link, error) {
 			atomic.AddUint32(&count, 1)
-			return link, nil
+			return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
 		},
 	}
 	dest := net.UDPDestination(net.LocalHostIP, 53)

+ 5 - 4
transport/pipe/impl.go

@@ -6,7 +6,6 @@ import (
 	"time"
 
 	"v2ray.com/core/common/buf"
-	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/signal"
 )
 
@@ -70,8 +69,6 @@ func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
 	}
 }
 
-var ErrTimeout = errors.New("Timeout on reading pipeline.")
-
 func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
 	timer := time.After(d)
 	for {
@@ -84,7 +81,7 @@ func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, err
 		select {
 		case <-p.readSignal.Wait():
 		case <-timer:
-			return nil, ErrTimeout
+			return nil, buf.ErrReadTimeout
 		}
 	}
 }
@@ -120,6 +117,10 @@ func (p *pipe) Close() error {
 	p.Lock()
 	defer p.Unlock()
 
+	if p.state == closed || p.state == errord {
+		return nil
+	}
+
 	p.state = closed
 	p.readSignal.Signal()
 	p.writeSignal.Signal()

+ 10 - 0
transport/pipe/pipe.go

@@ -37,6 +37,16 @@ func New(opts ...Option) (*Reader, *Writer) {
 		}
 }
 
+type closeError interface {
+	CloseError()
+}
+
+func CloseError(v interface{}) {
+	if c, ok := v.(closeError); ok {
+		c.CloseError()
+	}
+}
+
 var defaultLimit int32 = 10 * 1024 * 1024
 
 func init() {

+ 0 - 268
transport/ray/direct.go

@@ -1,268 +0,0 @@
-package ray
-
-import (
-	"context"
-	"io"
-	"sync"
-	"time"
-
-	"v2ray.com/core/common"
-	"v2ray.com/core/common/buf"
-	"v2ray.com/core/common/platform"
-	"v2ray.com/core/common/signal"
-)
-
-type Option func(*directRay)
-
-type addInt64 interface {
-	Add(int64) int64
-}
-
-func WithUplinkStatCounter(c addInt64) Option {
-	return func(s *directRay) {
-		s.Input.onDataSize = append(s.Input.onDataSize, func(delta uint64) {
-			c.Add(int64(delta))
-		})
-	}
-}
-
-func WithDownlinkStatCounter(c addInt64) Option {
-	return func(s *directRay) {
-		s.Output.onDataSize = append(s.Output.onDataSize, func(delta uint64) {
-			c.Add(int64(delta))
-		})
-	}
-}
-
-// New creates a new Ray for direct traffic transport.
-func New(ctx context.Context, opts ...Option) Ray {
-	r := &directRay{
-		Input:  NewStream(ctx),
-		Output: NewStream(ctx),
-	}
-	for _, opt := range opts {
-		opt(r)
-	}
-	return r
-}
-
-type directRay struct {
-	Input  *Stream
-	Output *Stream
-}
-
-func (v *directRay) OutboundInput() InputStream {
-	return v.Input
-}
-
-func (v *directRay) OutboundOutput() OutputStream {
-	return v.Output
-}
-
-func (v *directRay) InboundInput() OutputStream {
-	return v.Input
-}
-
-func (v *directRay) InboundOutput() InputStream {
-	return v.Output
-}
-
-var streamSizeLimit uint64 = 10 * 1024 * 1024
-
-func init() {
-	const raySizeEnvKey = "v2ray.ray.buffer.size"
-	size := platform.EnvFlag{
-		Name:    raySizeEnvKey,
-		AltName: platform.NormalizeEnvName(raySizeEnvKey),
-	}.GetValueAsInt(10)
-	streamSizeLimit = uint64(size) * 1024 * 1024
-}
-
-// Stream is a sequential container for data in bytes.
-type Stream struct {
-	access      sync.RWMutex
-	data        buf.MultiBuffer
-	size        uint64
-	ctx         context.Context
-	readSignal  *signal.Notifier
-	writeSignal *signal.Notifier
-	onDataSize  []func(uint64)
-	close       bool
-	err         bool
-}
-
-// NewStream creates a new Stream.
-func NewStream(ctx context.Context) *Stream {
-	s := &Stream{
-		ctx:         ctx,
-		readSignal:  signal.NewNotifier(),
-		writeSignal: signal.NewNotifier(),
-		size:        0,
-	}
-	return s
-}
-
-func (s *Stream) getData() (buf.MultiBuffer, error) {
-	s.access.Lock()
-	defer s.access.Unlock()
-
-	if s.data != nil {
-		mb := s.data
-		s.data = nil
-		s.size = 0
-		return mb, nil
-	}
-
-	if s.err {
-		return nil, io.ErrClosedPipe
-	}
-
-	if s.close {
-		return nil, io.EOF
-	}
-
-	return nil, nil
-}
-
-// Peek fills in the given buffer with data from head of the Stream.
-func (s *Stream) Peek(b *buf.Buffer) {
-	s.access.RLock()
-	defer s.access.RUnlock()
-
-	common.Must(b.Reset(func(data []byte) (int, error) {
-		return s.data.Copy(data), nil
-	}))
-}
-
-// ReadMultiBuffer reads data from the Stream.
-func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) {
-	for {
-		mb, err := s.getData()
-		if err != nil {
-			return nil, err
-		}
-
-		if mb != nil {
-			s.readSignal.Signal()
-			return mb, nil
-		}
-
-		select {
-		case <-s.ctx.Done():
-			return nil, s.ctx.Err()
-		case <-s.writeSignal.Wait():
-		}
-	}
-}
-
-// ReadTimeout reads from the Stream with a specified timeout.
-func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
-	for {
-		mb, err := s.getData()
-		if err != nil {
-			return nil, err
-		}
-
-		if mb != nil {
-			s.readSignal.Signal()
-			return mb, nil
-		}
-
-		select {
-		case <-s.ctx.Done():
-			return nil, s.ctx.Err()
-		case <-time.After(timeout):
-			return nil, buf.ErrReadTimeout
-		case <-s.writeSignal.Wait():
-		}
-	}
-}
-
-// Size returns the number of bytes hold in the Stream.
-func (s *Stream) Size() uint64 {
-	s.access.RLock()
-	defer s.access.RUnlock()
-
-	return s.size
-}
-
-// waitForStreamSize waits until the Stream has room for more data, or any error happens.
-func (s *Stream) waitForStreamSize() error {
-	if streamSizeLimit == 0 {
-		return nil
-	}
-
-	for s.Size() >= streamSizeLimit {
-		select {
-		case <-s.ctx.Done():
-			return s.ctx.Err()
-		case <-s.readSignal.Wait():
-			if s.err || s.close {
-				return io.ErrClosedPipe
-			}
-		}
-	}
-
-	return nil
-}
-
-// WriteMultiBuffer writes more data into the Stream.
-func (s *Stream) WriteMultiBuffer(data buf.MultiBuffer) error {
-	if data.IsEmpty() {
-		return nil
-	}
-
-	if err := s.waitForStreamSize(); err != nil {
-		data.Release()
-		return err
-	}
-
-	s.access.Lock()
-	defer s.access.Unlock()
-
-	if s.err || s.close {
-		data.Release()
-		return io.ErrClosedPipe
-	}
-
-	if s.data == nil {
-		s.data = buf.NewMultiBufferCap(128)
-	}
-
-	dataSize := uint64(data.Len())
-	for _, f := range s.onDataSize {
-		f(dataSize)
-	}
-
-	s.data.AppendMulti(data)
-	s.size += dataSize
-	s.writeSignal.Signal()
-
-	return nil
-}
-
-// Close closes the stream for writing. Read() still works until EOF.
-func (s *Stream) Close() error {
-	s.access.Lock()
-	s.close = true
-	s.readSignal.Signal()
-	s.writeSignal.Signal()
-	s.access.Unlock()
-	return nil
-}
-
-// CloseError closes the Stream with error. Read() will return an error afterwards.
-func (s *Stream) CloseError() {
-	s.access.Lock()
-	s.err = true
-	if s.data != nil {
-		s.data.Release()
-		s.data = nil
-		s.size = 0
-	}
-	s.access.Unlock()
-
-	s.readSignal.Signal()
-	s.writeSignal.Signal()
-
-}

+ 0 - 49
transport/ray/direct_test.go

@@ -1,49 +0,0 @@
-package ray_test
-
-import (
-	"context"
-	"io"
-	"testing"
-
-	"v2ray.com/core/common/buf"
-	. "v2ray.com/core/transport/ray"
-	. "v2ray.com/ext/assert"
-)
-
-func TestStreamIO(t *testing.T) {
-	assert := With(t)
-
-	stream := NewStream(context.Background())
-	b1 := buf.New()
-	b1.AppendBytes('a')
-	assert(stream.WriteMultiBuffer(buf.NewMultiBufferValue(b1)), IsNil)
-
-	_, err := stream.ReadMultiBuffer()
-	assert(err, IsNil)
-
-	stream.Close()
-	_, err = stream.ReadMultiBuffer()
-	assert(err, Equals, io.EOF)
-
-	b2 := buf.New()
-	b2.AppendBytes('b')
-	err = stream.WriteMultiBuffer(buf.NewMultiBufferValue(b2))
-	assert(err, Equals, io.ErrClosedPipe)
-}
-
-func TestStreamClose(t *testing.T) {
-	assert := With(t)
-
-	stream := NewStream(context.Background())
-	b1 := buf.New()
-	b1.AppendBytes('a')
-	assert(stream.WriteMultiBuffer(buf.NewMultiBufferValue(b1)), IsNil)
-
-	stream.Close()
-
-	_, err := stream.ReadMultiBuffer()
-	assert(err, IsNil)
-
-	_, err = stream.ReadMultiBuffer()
-	assert(err, Equals, io.EOF)
-}

+ 0 - 54
transport/ray/ray.go

@@ -1,54 +0,0 @@
-package ray
-
-import (
-	"v2ray.com/core/common"
-	"v2ray.com/core/common/buf"
-)
-
-// OutboundRay is a transport interface for outbound connections.
-type OutboundRay interface {
-	// OutboundInput provides a stream for the input of the outbound connection.
-	// The outbound connection shall write all the input until it is closed.
-	OutboundInput() InputStream
-
-	// OutboundOutput provides a stream to retrieve the response from the
-	// outbound connection. The outbound connection shall close the channel
-	// after all responses are received and put into the channel.
-	OutboundOutput() OutputStream
-}
-
-// InboundRay is a transport interface for inbound connections.
-type InboundRay interface {
-	// InboundInput provides a stream to retrieve the request from client.
-	// The inbound connection shall close the channel after the entire request
-	// is received and put into the channel.
-	InboundInput() OutputStream
-
-	// InboundOutput provides a stream of data for the inbound connection to write
-	// as response. The inbound connection shall write all the data from the
-	// channel until it is closed.
-	InboundOutput() InputStream
-}
-
-// Ray is an internal transport channel between inbound and outbound connection.
-type Ray interface {
-	InboundRay
-	OutboundRay
-}
-
-type RayStream interface {
-	common.Closable
-	CloseError()
-}
-
-type InputStream interface {
-	buf.Reader
-	buf.TimeoutReader
-	RayStream
-	Peek(*buf.Buffer)
-}
-
-type OutputStream interface {
-	buf.Writer
-	RayStream
-}