Browse Source

refactor mux

Darien Raymond 7 years ago
parent
commit
04bbdfc426
3 changed files with 506 additions and 491 deletions
  1. 290 0
      common/mux/client.go
  2. 0 491
      common/mux/mux.go
  3. 216 0
      common/mux/server.go

+ 290 - 0
common/mux/client.go

@@ -0,0 +1,290 @@
+package mux
+
+import (
+	"context"
+	"io"
+	"sync"
+	"time"
+
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/common/session"
+	"v2ray.com/core/common/signal/done"
+	"v2ray.com/core/common/vio"
+	"v2ray.com/core/proxy"
+	"v2ray.com/core/transport/internet"
+	"v2ray.com/core/transport/pipe"
+)
+
+type ClientManager struct {
+	access      sync.Mutex
+	clients     []*Client
+	proxy       proxy.Outbound
+	dialer      internet.Dialer
+	concurrency uint32
+}
+
+func NewClientManager(p proxy.Outbound, d internet.Dialer, c uint32) *ClientManager {
+	return &ClientManager{
+		proxy:       p,
+		dialer:      d,
+		concurrency: c,
+	}
+}
+
+func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	for _, client := range m.clients {
+		if client.Dispatch(ctx, link) {
+			return nil
+		}
+	}
+
+	client, err := NewClient(ctx, m.proxy, m.dialer, m)
+	if err != nil {
+		return newError("failed to create client").Base(err)
+	}
+	m.clients = append(m.clients, client)
+	client.Dispatch(ctx, link)
+	return nil
+}
+
+func (m *ClientManager) onClientFinish() {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	activeClients := make([]*Client, 0, len(m.clients))
+
+	for _, client := range m.clients {
+		if !client.Closed() {
+			activeClients = append(activeClients, client)
+		}
+	}
+	m.clients = activeClients
+}
+
+type Client struct {
+	sessionManager *SessionManager
+	link           vio.Link
+	done           *done.Instance
+	manager        *ClientManager
+	concurrency    uint32
+}
+
+var muxCoolAddress = net.DomainAddress("v1.mux.cool")
+var muxCoolPort = net.Port(9527)
+
+// NewClient creates a new mux.Client.
+func NewClient(pctx context.Context, p proxy.Outbound, dialer internet.Dialer, m *ClientManager) (*Client, error) {
+	ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
+		Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
+	})
+	ctx, cancel := context.WithCancel(ctx)
+
+	opts := pipe.OptionsFromContext(pctx)
+	uplinkReader, upLinkWriter := pipe.New(opts...)
+	downlinkReader, downlinkWriter := pipe.New(opts...)
+
+	c := &Client{
+		sessionManager: NewSessionManager(),
+		link: vio.Link{
+			Reader: downlinkReader,
+			Writer: upLinkWriter,
+		},
+		done:        done.New(),
+		manager:     m,
+		concurrency: m.concurrency,
+	}
+
+	go func() {
+		if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
+			errors.New("failed to handler mux client connection").Base(err).WriteToLog()
+		}
+		common.Must(c.done.Close())
+		cancel()
+	}()
+
+	go c.fetchOutput()
+	go c.monitor()
+	return c, nil
+}
+
+// Closed returns true if this Client is closed.
+func (m *Client) Closed() bool {
+	return m.done.Done()
+}
+
+func (m *Client) monitor() {
+	defer m.manager.onClientFinish()
+
+	timer := time.NewTicker(time.Second * 16)
+	defer timer.Stop()
+
+	for {
+		select {
+		case <-m.done.Wait():
+			m.sessionManager.Close()
+			common.Close(m.link.Writer)    // nolint: errcheck
+			pipe.CloseError(m.link.Reader) // nolint: errcheck
+			return
+		case <-timer.C:
+			size := m.sessionManager.Size()
+			if size == 0 && m.sessionManager.CloseIfNoSession() {
+				common.Must(m.done.Close())
+			}
+		}
+	}
+}
+
+func writeFirstPayload(reader buf.Reader, writer *Writer) error {
+	err := buf.CopyOnceTimeout(reader, writer, time.Millisecond*100)
+	if err == buf.ErrNotTimeoutReader || err == buf.ErrReadTimeout {
+		return writer.WriteMultiBuffer(buf.MultiBuffer{})
+	}
+
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
+	dest := session.OutboundFromContext(ctx).Target
+	transferType := protocol.TransferTypeStream
+	if dest.Network == net.Network_UDP {
+		transferType = protocol.TransferTypePacket
+	}
+	s.transferType = transferType
+	writer := NewWriter(s.ID, dest, output, transferType)
+	defer s.Close()      // nolint: errcheck
+	defer writer.Close() // nolint: errcheck
+
+	newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx))
+	if err := writeFirstPayload(s.input, writer); err != nil {
+		newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
+		writer.hasError = true
+		pipe.CloseError(s.input)
+		return
+	}
+
+	if err := buf.Copy(s.input, writer); err != nil {
+		newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
+		writer.hasError = true
+		pipe.CloseError(s.input)
+		return
+	}
+}
+
+func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
+	sm := m.sessionManager
+	if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
+		return false
+	}
+
+	if m.done.Done() {
+		return false
+	}
+
+	s := sm.Allocate()
+	if s == nil {
+		return false
+	}
+	s.input = link.Reader
+	s.output = link.Writer
+	go fetchInput(ctx, s, m.link.Writer)
+	return true
+}
+
+func drain(reader buf.Reader) error {
+	return buf.Copy(reader, buf.Discard)
+}
+
+func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(NewStreamReader(reader))
+	}
+	return nil
+}
+
+func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(NewStreamReader(reader))
+	}
+	return nil
+}
+
+func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+
+	if s, found := m.sessionManager.Get(meta.SessionID); found {
+		rr := s.NewReader(reader)
+		if err := buf.Copy(rr, s.output); err != nil {
+			drain(rr)
+			pipe.CloseError(s.input)
+			return s.Close()
+		}
+		return nil
+	}
+	return drain(NewStreamReader(reader))
+}
+
+func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if s, found := m.sessionManager.Get(meta.SessionID); found {
+		if meta.Option.Has(OptionError) {
+			pipe.CloseError(s.input)
+			pipe.CloseError(s.output)
+		}
+		s.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(NewStreamReader(reader))
+	}
+	return nil
+}
+
+func (m *Client) fetchOutput() {
+	defer func() {
+		common.Must(m.done.Close())
+	}()
+
+	reader := &buf.BufferedReader{Reader: m.link.Reader}
+
+	var meta FrameMetadata
+	for {
+		err := meta.Unmarshal(reader)
+		if err != nil {
+			if errors.Cause(err) != io.EOF {
+				newError("failed to read metadata").Base(err).WriteToLog()
+			}
+			break
+		}
+
+		switch meta.SessionStatus {
+		case SessionStatusKeepAlive:
+			err = m.handleStatueKeepAlive(&meta, reader)
+		case SessionStatusEnd:
+			err = m.handleStatusEnd(&meta, reader)
+		case SessionStatusNew:
+			err = m.handleStatusNew(&meta, reader)
+		case SessionStatusKeep:
+			err = m.handleStatusKeep(&meta, reader)
+		default:
+			status := meta.SessionStatus
+			newError("unknown status: ", status).AtError().WriteToLog()
+			return
+		}
+
+		if err != nil {
+			newError("failed to process data").Base(err).WriteToLog()
+			return
+		}
+	}
+}

+ 0 - 491
common/mux/mux.go

@@ -2,497 +2,6 @@ package mux
 
 //go:generate errorgen
 
-import (
-	"context"
-	"io"
-	"sync"
-	"time"
-
-	"v2ray.com/core"
-	"v2ray.com/core/common"
-	"v2ray.com/core/common/buf"
-	"v2ray.com/core/common/errors"
-	"v2ray.com/core/common/log"
-	"v2ray.com/core/common/net"
-	"v2ray.com/core/common/protocol"
-	"v2ray.com/core/common/session"
-	"v2ray.com/core/common/signal/done"
-	"v2ray.com/core/common/vio"
-	"v2ray.com/core/features/routing"
-	"v2ray.com/core/proxy"
-	"v2ray.com/core/transport/internet"
-	"v2ray.com/core/transport/pipe"
-)
-
 const (
 	maxTotal = 128
 )
-
-type ClientManager struct {
-	access      sync.Mutex
-	clients     []*Client
-	proxy       proxy.Outbound
-	dialer      internet.Dialer
-	concurrency uint32
-}
-
-func NewClientManager(p proxy.Outbound, d internet.Dialer, c uint32) *ClientManager {
-	return &ClientManager{
-		proxy:       p,
-		dialer:      d,
-		concurrency: c,
-	}
-}
-
-func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
-	m.access.Lock()
-	defer m.access.Unlock()
-
-	for _, client := range m.clients {
-		if client.Dispatch(ctx, link) {
-			return nil
-		}
-	}
-
-	client, err := NewClient(ctx, m.proxy, m.dialer, m)
-	if err != nil {
-		return newError("failed to create client").Base(err)
-	}
-	m.clients = append(m.clients, client)
-	client.Dispatch(ctx, link)
-	return nil
-}
-
-func (m *ClientManager) onClientFinish() {
-	m.access.Lock()
-	defer m.access.Unlock()
-
-	activeClients := make([]*Client, 0, len(m.clients))
-
-	for _, client := range m.clients {
-		if !client.Closed() {
-			activeClients = append(activeClients, client)
-		}
-	}
-	m.clients = activeClients
-}
-
-type Client struct {
-	sessionManager *SessionManager
-	link           vio.Link
-	done           *done.Instance
-	manager        *ClientManager
-	concurrency    uint32
-}
-
-var muxCoolAddress = net.DomainAddress("v1.mux.cool")
-var muxCoolPort = net.Port(9527)
-
-// NewClient creates a new mux.Client.
-func NewClient(pctx context.Context, p proxy.Outbound, dialer internet.Dialer, m *ClientManager) (*Client, error) {
-	ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
-		Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
-	})
-	ctx, cancel := context.WithCancel(ctx)
-
-	opts := pipe.OptionsFromContext(pctx)
-	uplinkReader, upLinkWriter := pipe.New(opts...)
-	downlinkReader, downlinkWriter := pipe.New(opts...)
-
-	c := &Client{
-		sessionManager: NewSessionManager(),
-		link: vio.Link{
-			Reader: downlinkReader,
-			Writer: upLinkWriter,
-		},
-		done:        done.New(),
-		manager:     m,
-		concurrency: m.concurrency,
-	}
-
-	go func() {
-		if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
-			errors.New("failed to handler mux client connection").Base(err).WriteToLog()
-		}
-		common.Must(c.done.Close())
-		cancel()
-	}()
-
-	go c.fetchOutput()
-	go c.monitor()
-	return c, nil
-}
-
-// Closed returns true if this Client is closed.
-func (m *Client) Closed() bool {
-	return m.done.Done()
-}
-
-func (m *Client) monitor() {
-	defer m.manager.onClientFinish()
-
-	timer := time.NewTicker(time.Second * 16)
-	defer timer.Stop()
-
-	for {
-		select {
-		case <-m.done.Wait():
-			m.sessionManager.Close()
-			common.Close(m.link.Writer)    // nolint: errcheck
-			pipe.CloseError(m.link.Reader) // nolint: errcheck
-			return
-		case <-timer.C:
-			size := m.sessionManager.Size()
-			if size == 0 && m.sessionManager.CloseIfNoSession() {
-				common.Must(m.done.Close())
-			}
-		}
-	}
-}
-
-func writeFirstPayload(reader buf.Reader, writer *Writer) error {
-	err := buf.CopyOnceTimeout(reader, writer, time.Millisecond*100)
-	if err == buf.ErrNotTimeoutReader || err == buf.ErrReadTimeout {
-		return writer.WriteMultiBuffer(buf.MultiBuffer{})
-	}
-
-	if err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
-	dest := session.OutboundFromContext(ctx).Target
-	transferType := protocol.TransferTypeStream
-	if dest.Network == net.Network_UDP {
-		transferType = protocol.TransferTypePacket
-	}
-	s.transferType = transferType
-	writer := NewWriter(s.ID, dest, output, transferType)
-	defer s.Close()      // nolint: errcheck
-	defer writer.Close() // nolint: errcheck
-
-	newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx))
-	if err := writeFirstPayload(s.input, writer); err != nil {
-		newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
-		writer.hasError = true
-		pipe.CloseError(s.input)
-		return
-	}
-
-	if err := buf.Copy(s.input, writer); err != nil {
-		newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
-		writer.hasError = true
-		pipe.CloseError(s.input)
-		return
-	}
-}
-
-func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
-	sm := m.sessionManager
-	if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
-		return false
-	}
-
-	if m.done.Done() {
-		return false
-	}
-
-	s := sm.Allocate()
-	if s == nil {
-		return false
-	}
-	s.input = link.Reader
-	s.output = link.Writer
-	go fetchInput(ctx, s, m.link.Writer)
-	return true
-}
-
-func drain(reader buf.Reader) error {
-	return buf.Copy(reader, buf.Discard)
-}
-
-func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
-	}
-	return nil
-}
-
-func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
-	}
-	return nil
-}
-
-func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if !meta.Option.Has(OptionData) {
-		return nil
-	}
-
-	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		rr := s.NewReader(reader)
-		if err := buf.Copy(rr, s.output); err != nil {
-			drain(rr)
-			pipe.CloseError(s.input)
-			return s.Close()
-		}
-		return nil
-	}
-	return drain(NewStreamReader(reader))
-}
-
-func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		if meta.Option.Has(OptionError) {
-			pipe.CloseError(s.input)
-			pipe.CloseError(s.output)
-		}
-		s.Close()
-	}
-	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
-	}
-	return nil
-}
-
-func (m *Client) fetchOutput() {
-	defer func() {
-		common.Must(m.done.Close())
-	}()
-
-	reader := &buf.BufferedReader{Reader: m.link.Reader}
-
-	var meta FrameMetadata
-	for {
-		err := meta.Unmarshal(reader)
-		if err != nil {
-			if errors.Cause(err) != io.EOF {
-				newError("failed to read metadata").Base(err).WriteToLog()
-			}
-			break
-		}
-
-		switch meta.SessionStatus {
-		case SessionStatusKeepAlive:
-			err = m.handleStatueKeepAlive(&meta, reader)
-		case SessionStatusEnd:
-			err = m.handleStatusEnd(&meta, reader)
-		case SessionStatusNew:
-			err = m.handleStatusNew(&meta, reader)
-		case SessionStatusKeep:
-			err = m.handleStatusKeep(&meta, reader)
-		default:
-			status := meta.SessionStatus
-			newError("unknown status: ", status).AtError().WriteToLog()
-			return
-		}
-
-		if err != nil {
-			newError("failed to process data").Base(err).WriteToLog()
-			return
-		}
-	}
-}
-
-type Server struct {
-	dispatcher routing.Dispatcher
-}
-
-// NewServer creates a new mux.Server.
-func NewServer(ctx context.Context) *Server {
-	s := &Server{}
-	core.RequireFeatures(ctx, func(d routing.Dispatcher) {
-		s.dispatcher = d
-	})
-	return s
-}
-
-// Type implements common.HasType.
-func (s *Server) Type() interface{} {
-	return s.dispatcher.Type()
-}
-
-// Dispatch impliments routing.Dispatcher
-func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
-	if dest.Address != muxCoolAddress {
-		return s.dispatcher.Dispatch(ctx, dest)
-	}
-
-	opts := pipe.OptionsFromContext(ctx)
-	uplinkReader, uplinkWriter := pipe.New(opts...)
-	downlinkReader, downlinkWriter := pipe.New(opts...)
-
-	worker := &ServerWorker{
-		dispatcher: s.dispatcher,
-		link: &vio.Link{
-			Reader: uplinkReader,
-			Writer: downlinkWriter,
-		},
-		sessionManager: NewSessionManager(),
-	}
-	go worker.run(ctx)
-	return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
-}
-
-// Start implements common.Runnable.
-func (s *Server) Start() error {
-	return nil
-}
-
-// Close implements common.Closable.
-func (s *Server) Close() error {
-	return nil
-}
-
-type ServerWorker struct {
-	dispatcher     routing.Dispatcher
-	link           *vio.Link
-	sessionManager *SessionManager
-}
-
-func handle(ctx context.Context, s *Session, output buf.Writer) {
-	writer := NewResponseWriter(s.ID, output, s.transferType)
-	if err := buf.Copy(s.input, writer); err != nil {
-		newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx))
-		writer.hasError = true
-	}
-
-	writer.Close()
-	s.Close()
-}
-
-func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
-	}
-	return nil
-}
-
-func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
-	newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx))
-	{
-		msg := &log.AccessMessage{
-			To:     meta.Target,
-			Status: log.AccessAccepted,
-			Reason: "",
-		}
-		if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Source.IsValid() {
-			msg.From = inbound.Source
-		}
-		log.Record(msg)
-	}
-	link, err := w.dispatcher.Dispatch(ctx, meta.Target)
-	if err != nil {
-		if meta.Option.Has(OptionData) {
-			drain(NewStreamReader(reader))
-		}
-		return newError("failed to dispatch request.").Base(err)
-	}
-	s := &Session{
-		input:        link.Reader,
-		output:       link.Writer,
-		parent:       w.sessionManager,
-		ID:           meta.SessionID,
-		transferType: protocol.TransferTypeStream,
-	}
-	if meta.Target.Network == net.Network_UDP {
-		s.transferType = protocol.TransferTypePacket
-	}
-	w.sessionManager.Add(s)
-	go handle(ctx, s, w.link.Writer)
-	if !meta.Option.Has(OptionData) {
-		return nil
-	}
-
-	rr := s.NewReader(reader)
-	if err := buf.Copy(rr, s.output); err != nil {
-		drain(rr)
-		pipe.CloseError(s.input)
-		return s.Close()
-	}
-	return nil
-}
-
-func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if !meta.Option.Has(OptionData) {
-		return nil
-	}
-	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		rr := s.NewReader(reader)
-		if err := buf.Copy(rr, s.output); err != nil {
-			drain(rr)
-			pipe.CloseError(s.input)
-			return s.Close()
-		}
-		return nil
-	}
-	return drain(NewStreamReader(reader))
-}
-
-func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		if meta.Option.Has(OptionError) {
-			pipe.CloseError(s.input)
-			pipe.CloseError(s.output)
-		}
-		s.Close()
-	}
-	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
-	}
-	return nil
-}
-
-func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
-	var meta FrameMetadata
-	err := meta.Unmarshal(reader)
-	if err != nil {
-		return newError("failed to read metadata").Base(err)
-	}
-
-	switch meta.SessionStatus {
-	case SessionStatusKeepAlive:
-		err = w.handleStatusKeepAlive(&meta, reader)
-	case SessionStatusEnd:
-		err = w.handleStatusEnd(&meta, reader)
-	case SessionStatusNew:
-		err = w.handleStatusNew(ctx, &meta, reader)
-	case SessionStatusKeep:
-		err = w.handleStatusKeep(&meta, reader)
-	default:
-		status := meta.SessionStatus
-		return newError("unknown status: ", status).AtError()
-	}
-
-	if err != nil {
-		return newError("failed to process data").Base(err)
-	}
-	return nil
-}
-
-func (w *ServerWorker) run(ctx context.Context) {
-	input := w.link.Reader
-	reader := &buf.BufferedReader{Reader: input}
-
-	defer w.sessionManager.Close() // nolint: errcheck
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			err := w.handleFrame(ctx, reader)
-			if err != nil {
-				if errors.Cause(err) != io.EOF {
-					newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
-					pipe.CloseError(input)
-				}
-				return
-			}
-		}
-	}
-}

+ 216 - 0
common/mux/server.go

@@ -0,0 +1,216 @@
+package mux
+
+import (
+	"context"
+	"io"
+
+	"v2ray.com/core"
+	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/log"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/common/session"
+	"v2ray.com/core/common/vio"
+	"v2ray.com/core/features/routing"
+	"v2ray.com/core/transport/pipe"
+)
+
+type Server struct {
+	dispatcher routing.Dispatcher
+}
+
+// NewServer creates a new mux.Server.
+func NewServer(ctx context.Context) *Server {
+	s := &Server{}
+	core.RequireFeatures(ctx, func(d routing.Dispatcher) {
+		s.dispatcher = d
+	})
+	return s
+}
+
+// Type implements common.HasType.
+func (s *Server) Type() interface{} {
+	return s.dispatcher.Type()
+}
+
+// Dispatch impliments routing.Dispatcher
+func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
+	if dest.Address != muxCoolAddress {
+		return s.dispatcher.Dispatch(ctx, dest)
+	}
+
+	opts := pipe.OptionsFromContext(ctx)
+	uplinkReader, uplinkWriter := pipe.New(opts...)
+	downlinkReader, downlinkWriter := pipe.New(opts...)
+
+	worker := &ServerWorker{
+		dispatcher: s.dispatcher,
+		link: &vio.Link{
+			Reader: uplinkReader,
+			Writer: downlinkWriter,
+		},
+		sessionManager: NewSessionManager(),
+	}
+	go worker.run(ctx)
+	return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
+}
+
+// Start implements common.Runnable.
+func (s *Server) Start() error {
+	return nil
+}
+
+// Close implements common.Closable.
+func (s *Server) Close() error {
+	return nil
+}
+
+type ServerWorker struct {
+	dispatcher     routing.Dispatcher
+	link           *vio.Link
+	sessionManager *SessionManager
+}
+
+func handle(ctx context.Context, s *Session, output buf.Writer) {
+	writer := NewResponseWriter(s.ID, output, s.transferType)
+	if err := buf.Copy(s.input, writer); err != nil {
+		newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx))
+		writer.hasError = true
+	}
+
+	writer.Close()
+	s.Close()
+}
+
+func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(NewStreamReader(reader))
+	}
+	return nil
+}
+
+func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
+	newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx))
+	{
+		msg := &log.AccessMessage{
+			To:     meta.Target,
+			Status: log.AccessAccepted,
+			Reason: "",
+		}
+		if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Source.IsValid() {
+			msg.From = inbound.Source
+		}
+		log.Record(msg)
+	}
+	link, err := w.dispatcher.Dispatch(ctx, meta.Target)
+	if err != nil {
+		if meta.Option.Has(OptionData) {
+			drain(NewStreamReader(reader))
+		}
+		return newError("failed to dispatch request.").Base(err)
+	}
+	s := &Session{
+		input:        link.Reader,
+		output:       link.Writer,
+		parent:       w.sessionManager,
+		ID:           meta.SessionID,
+		transferType: protocol.TransferTypeStream,
+	}
+	if meta.Target.Network == net.Network_UDP {
+		s.transferType = protocol.TransferTypePacket
+	}
+	w.sessionManager.Add(s)
+	go handle(ctx, s, w.link.Writer)
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+
+	rr := s.NewReader(reader)
+	if err := buf.Copy(rr, s.output); err != nil {
+		drain(rr)
+		pipe.CloseError(s.input)
+		return s.Close()
+	}
+	return nil
+}
+
+func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+	if s, found := w.sessionManager.Get(meta.SessionID); found {
+		rr := s.NewReader(reader)
+		if err := buf.Copy(rr, s.output); err != nil {
+			drain(rr)
+			pipe.CloseError(s.input)
+			return s.Close()
+		}
+		return nil
+	}
+	return drain(NewStreamReader(reader))
+}
+
+func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if s, found := w.sessionManager.Get(meta.SessionID); found {
+		if meta.Option.Has(OptionError) {
+			pipe.CloseError(s.input)
+			pipe.CloseError(s.output)
+		}
+		s.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(NewStreamReader(reader))
+	}
+	return nil
+}
+
+func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
+	var meta FrameMetadata
+	err := meta.Unmarshal(reader)
+	if err != nil {
+		return newError("failed to read metadata").Base(err)
+	}
+
+	switch meta.SessionStatus {
+	case SessionStatusKeepAlive:
+		err = w.handleStatusKeepAlive(&meta, reader)
+	case SessionStatusEnd:
+		err = w.handleStatusEnd(&meta, reader)
+	case SessionStatusNew:
+		err = w.handleStatusNew(ctx, &meta, reader)
+	case SessionStatusKeep:
+		err = w.handleStatusKeep(&meta, reader)
+	default:
+		status := meta.SessionStatus
+		return newError("unknown status: ", status).AtError()
+	}
+
+	if err != nil {
+		return newError("failed to process data").Base(err)
+	}
+	return nil
+}
+
+func (w *ServerWorker) run(ctx context.Context) {
+	input := w.link.Reader
+	reader := &buf.BufferedReader{Reader: input}
+
+	defer w.sessionManager.Close() // nolint: errcheck
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			err := w.handleFrame(ctx, reader)
+			if err != nil {
+				if errors.Cause(err) != io.EOF {
+					newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
+					pipe.CloseError(input)
+				}
+				return
+			}
+		}
+	}
+}