浏览代码

simplify dialer and dispatcher parameters

Darien Raymond 8 年之前
父节点
当前提交
c4d0227977

+ 2 - 1
app/dispatcher/dispatcher.go

@@ -4,12 +4,13 @@ import (
 	"context"
 
 	"v2ray.com/core/app"
+	"v2ray.com/core/common/net"
 	"v2ray.com/core/transport/ray"
 )
 
 // Interface dispatch a packet and possibly further network payload to its destination.
 type Interface interface {
-	DispatchToOutbound(ctx context.Context) ray.InboundRay
+	Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error)
 	Start() error
 	Close()
 }

+ 5 - 3
app/dispatcher/impl/default.go

@@ -12,6 +12,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/ray"
 )
@@ -48,13 +49,14 @@ func (DefaultDispatcher) Interface() interface{} {
 	return (*dispatcher.Interface)(nil)
 }
 
-func (v *DefaultDispatcher) DispatchToOutbound(ctx context.Context) ray.InboundRay {
+func (v *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) {
 	dispatcher := v.ohm.GetDefaultHandler()
-	destination := proxy.DestinationFromContext(ctx)
 	if !destination.IsValid() {
 		panic("Dispatcher: Invalid destination.")
 	}
 
+	ctx = proxy.ContextWithDestination(ctx, destination)
+
 	if v.router != nil {
 		if tag, err := v.router.TakeDetour(ctx); err == nil {
 			if handler := v.ohm.GetHandler(tag); handler != nil {
@@ -82,7 +84,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(ctx context.Context) ray.InboundR
 
 	go v.waitAndDispatch(ctx, waitFunc, direct, dispatcher)
 
-	return direct
+	return direct, nil
 }
 
 func (v *DefaultDispatcher) waitAndDispatch(ctx context.Context, wait func() error, link ray.OutboundRay, dispatcher proxyman.OutboundHandler) {

+ 24 - 3
app/proxyman/inbound/always.go

@@ -3,16 +3,21 @@ package inbound
 import (
 	"context"
 
+	"v2ray.com/core/app"
+	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common/dice"
-	"v2ray.com/core/app/log"
+	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
+	"v2ray.com/core/transport/ray"
 )
 
 type AlwaysOnInboundHandler struct {
-	proxy   proxy.Inbound
-	workers []worker
+	proxy      proxy.Inbound
+	workers    []worker
+	dispatcher dispatcher.Interface
 }
 
 func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
@@ -25,6 +30,16 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 		proxy: p,
 	}
 
+	space := app.SpaceFromContext(ctx)
+	space.OnInitialize(func() error {
+		d := dispatcher.FromSpace(space)
+		if d == nil {
+			return errors.New("Proxyman|DefaultInboundHandler: No dispatcher in space.")
+		}
+		h.dispatcher = d
+		return nil
+	})
+
 	nl := p.Network()
 	pr := receiverConfig.PortRange
 	address := receiverConfig.Listen.AsAddress()
@@ -42,6 +57,7 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 				recvOrigDest:     receiverConfig.ReceiveOriginalDestination,
 				tag:              tag,
 				allowPassiveConn: receiverConfig.AllowPassiveConnection,
+				dispatcher:       h,
 			}
 			h.workers = append(h.workers, worker)
 		}
@@ -53,6 +69,7 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 				address:      address,
 				port:         net.Port(port),
 				recvOrigDest: receiverConfig.ReceiveOriginalDestination,
+				dispatcher:   h,
 			}
 			h.workers = append(h.workers, worker)
 		}
@@ -80,3 +97,7 @@ func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (proxy.Inbound, net.Por
 	w := h.workers[dice.Roll(len(h.workers))]
 	return w.Proxy(), w.Port(), 9999
 }
+
+func (h *AlwaysOnInboundHandler) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
+	return h.dispatcher.Dispatch(ctx, dest)
+}

+ 22 - 1
app/proxyman/inbound/dynamic.go

@@ -2,14 +2,18 @@ package inbound
 
 import (
 	"context"
+	"errors"
 	"sync"
 	"time"
 
+	"v2ray.com/core/app"
+	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common/dice"
-	"v2ray.com/core/app/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
+	"v2ray.com/core/transport/ray"
 )
 
 type DynamicInboundHandler struct {
@@ -23,6 +27,7 @@ type DynamicInboundHandler struct {
 	workerMutex    sync.RWMutex
 	worker         []worker
 	lastRefresh    time.Time
+	dispatcher     dispatcher.Interface
 }
 
 func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*DynamicInboundHandler, error) {
@@ -36,6 +41,16 @@ func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *p
 		portsInUse:     make(map[v2net.Port]bool),
 	}
 
+	space := app.SpaceFromContext(ctx)
+	space.OnInitialize(func() error {
+		d := dispatcher.FromSpace(space)
+		if d == nil {
+			return errors.New("Proxyman|DefaultInboundHandler: No dispatcher in space.")
+		}
+		h.dispatcher = d
+		return nil
+	})
+
 	return h, nil
 }
 
@@ -102,6 +117,7 @@ func (h *DynamicInboundHandler) refresh() error {
 				stream:           h.receiverConfig.StreamSettings,
 				recvOrigDest:     h.receiverConfig.ReceiveOriginalDestination,
 				allowPassiveConn: h.receiverConfig.AllowPassiveConnection,
+				dispatcher:       h,
 			}
 			if err := worker.Start(); err != nil {
 				log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err)
@@ -117,6 +133,7 @@ func (h *DynamicInboundHandler) refresh() error {
 				address:      address,
 				port:         port,
 				recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
+				dispatcher:   h,
 			}
 			if err := worker.Start(); err != nil {
 				log.Warning("Proxyman:InboundHandler: Failed to create UDP worker: ", err)
@@ -164,3 +181,7 @@ func (h *DynamicInboundHandler) GetRandomInboundProxy() (proxy.Inbound, v2net.Po
 	expire := h.receiverConfig.AllocationStrategy.GetRefreshValue() - uint32(time.Since(h.lastRefresh)/time.Minute)
 	return w.Proxy(), w.Port(), int(expire)
 }
+
+func (h *DynamicInboundHandler) Dispatch(ctx context.Context, dest v2net.Destination) (ray.InboundRay, error) {
+	return h.dispatcher.Dispatch(ctx, dest)
+}

+ 10 - 2
app/proxyman/inbound/worker.go

@@ -8,6 +8,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/buf"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
@@ -31,6 +33,7 @@ type tcpWorker struct {
 	recvOrigDest     bool
 	tag              string
 	allowPassiveConn bool
+	dispatcher       dispatcher.Interface
 
 	ctx    context.Context
 	cancel context.CancelFunc
@@ -51,7 +54,9 @@ func (w *tcpWorker) callback(conn internet.Connection) {
 	ctx = proxy.ContextWithAllowPassiveConnection(ctx, w.allowPassiveConn)
 	ctx = proxy.ContextWithInboundDestination(ctx, v2net.TCPDestination(w.address, w.port))
 	ctx = proxy.ContextWithSource(ctx, v2net.DestinationFromAddr(conn.RemoteAddr()))
-	w.proxy.Process(ctx, v2net.Network_TCP, conn)
+	if err := w.proxy.Process(ctx, v2net.Network_TCP, conn, w.dispatcher); err != nil {
+		log.Info("Proxyman|TCPWorker: Connection ends with ", err)
+	}
 	cancel()
 	conn.Close()
 }
@@ -151,6 +156,7 @@ type udpWorker struct {
 	port         v2net.Port
 	recvOrigDest bool
 	tag          string
+	dispatcher   dispatcher.Interface
 
 	ctx        context.Context
 	cancel     context.CancelFunc
@@ -206,7 +212,9 @@ func (w *udpWorker) callback(b *buf.Buffer, source v2net.Destination, originalDe
 			}
 			ctx = proxy.ContextWithSource(ctx, source)
 			ctx = proxy.ContextWithInboundDestination(ctx, v2net.UDPDestination(w.address, w.port))
-			w.proxy.Process(ctx, v2net.Network_UDP, conn)
+			if err := w.proxy.Process(ctx, v2net.Network_UDP, conn, w.dispatcher); err != nil {
+				log.Info("Proxyman|UDPWorker: Connection ends with ", err)
+			}
 			w.removeConn(source)
 			cancel()
 		}()

+ 2 - 3
app/proxyman/outbound/handler.go

@@ -7,10 +7,10 @@ import (
 	"time"
 
 	"v2ray.com/core/app"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
@@ -64,8 +64,7 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
 }
 
 func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
-	ctx = proxy.ContextWithDialer(ctx, h)
-	err := h.proxy.Process(ctx, outboundRay)
+	err := h.proxy.Process(ctx, outboundRay, h)
 	// Ensure outbound ray is properly closed.
 	if err != nil && errors.Cause(err) != io.EOF {
 		outboundRay.OutboundOutput().CloseError()

+ 2 - 1
proxy/blackhole/blackhole.go

@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"v2ray.com/core/common"
+	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/ray"
 )
 
@@ -31,7 +32,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 }
 
 // Dispatch implements OutboundHandler.Dispatch().
-func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) error {
+func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
 	v.response.WriteTo(outboundRay.OutboundOutput())
 	// Sleep a little here to make sure the response is sent to client.
 	time.Sleep(time.Second)

+ 2 - 14
proxy/context.go

@@ -9,8 +9,7 @@ import (
 type key int
 
 const (
-	dialerKey key = iota
-	sourceKey
+	sourceKey key = iota
 	destinationKey
 	originalDestinationKey
 	inboundDestinationKey
@@ -18,20 +17,9 @@ const (
 	outboundTagKey
 	resolvedIPsKey
 	allowPassiveConnKey
+	dispatcherKey
 )
 
-func ContextWithDialer(ctx context.Context, dialer Dialer) context.Context {
-	return context.WithValue(ctx, dialerKey, dialer)
-}
-
-func DialerFromContext(ctx context.Context) Dialer {
-	v := ctx.Value(dialerKey)
-	if v == nil {
-		return nil
-	}
-	return v.(Dialer)
-}
-
 func ContextWithSource(ctx context.Context, src net.Destination) context.Context {
 	return context.WithValue(ctx, sourceKey, src)
 }

+ 9 - 15
proxy/dokodemo/dokodemo.go

@@ -7,10 +7,10 @@ import (
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
@@ -18,10 +18,9 @@ import (
 )
 
 type DokodemoDoor struct {
-	config           *Config
-	address          net.Address
-	port             net.Port
-	packetDispatcher dispatcher.Interface
+	config  *Config
+	address net.Address
+	port    net.Port
 }
 
 func New(ctx context.Context, config *Config) (*DokodemoDoor, error) {
@@ -37,13 +36,6 @@ func New(ctx context.Context, config *Config) (*DokodemoDoor, error) {
 		address: config.GetPredefinedAddress(),
 		port:    net.Port(config.Port),
 	}
-	space.OnInitialize(func() error {
-		d.packetDispatcher = dispatcher.FromSpace(space)
-		if d.packetDispatcher == nil {
-			return errors.New("Dokodemo: Dispatcher is not found in the space.")
-		}
-		return nil
-	})
 	return d, nil
 }
 
@@ -51,7 +43,7 @@ func (d *DokodemoDoor) Network() net.NetworkList {
 	return *(d.config.NetworkList)
 }
 
-func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection) error {
+func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	log.Debug("Dokodemo: processing connection from: ", conn.RemoteAddr())
 	conn.SetReusable(false)
 	dest := net.Destination{
@@ -68,7 +60,6 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 		log.Info("Dokodemo: Invalid destination. Discarding...")
 		return errors.New("Dokodemo: Unable to get destination.")
 	}
-	ctx = proxy.ContextWithDestination(ctx, dest)
 	ctx, cancel := context.WithCancel(ctx)
 	timeout := time.Second * time.Duration(d.config.Timeout)
 	if timeout == 0 {
@@ -76,7 +67,10 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	}
 	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
 
-	inboundRay := d.packetDispatcher.DispatchToOutbound(ctx)
+	inboundRay, err := dispatcher.Dispatch(ctx, dest)
+	if err != nil {
+		return err
+	}
 
 	requestDone := signal.ExecuteAsync(func() error {
 		defer inboundRay.InboundInput().Close()

+ 2 - 3
proxy/freedom/freedom.go

@@ -8,11 +8,11 @@ import (
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dns"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/dice"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/signal"
@@ -73,7 +73,7 @@ func (v *Handler) ResolveIP(destination net.Destination) net.Destination {
 	return newDest
 }
 
-func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) error {
+func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
 	destination := proxy.DestinationFromContext(ctx)
 	if v.destOverride != nil {
 		server := v.destOverride.Server
@@ -93,7 +93,6 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay) erro
 		destination = v.ResolveIP(destination)
 	}
 
-	dialer := proxy.DialerFromContext(ctx)
 	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		rawConn, err := dialer.Dial(ctx, destination)
 		if err != nil {

+ 16 - 21
proxy/http/server.go

@@ -8,27 +8,23 @@ import (
 	"runtime"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/signal"
-	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 )
 
 // Server is a HTTP proxy server.
 type Server struct {
-	sync.Mutex
-	packetDispatcher dispatcher.Interface
-	config           *ServerConfig
+	config *ServerConfig
 }
 
 // NewServer creates a new HTTP inbound handler.
@@ -40,13 +36,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 	s := &Server{
 		config: config,
 	}
-	space.OnInitialize(func() error {
-		s.packetDispatcher = dispatcher.FromSpace(space)
-		if s.packetDispatcher == nil {
-			return errors.New("HTTP|Server: Dispatcher not found in space.")
-		}
-		return nil
-	})
 	return s, nil
 }
 
@@ -79,7 +68,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
 	return v2net.TCPDestination(v2net.DomainAddress(host), port), nil
 }
 
-func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection) error {
+func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	conn.SetReusable(false)
 
 	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
@@ -109,15 +98,15 @@ func (s *Server) Process(ctx context.Context, network v2net.Network, conn intern
 		return err
 	}
 	log.Access(conn.RemoteAddr(), request.URL, log.AccessAccepted, "")
-	ctx = proxy.ContextWithDestination(ctx, dest)
+
 	if strings.ToUpper(request.Method) == "CONNECT" {
-		return s.handleConnect(ctx, request, reader, conn)
+		return s.handleConnect(ctx, request, reader, conn, dest, dispatcher)
 	} else {
-		return s.handlePlainHTTP(ctx, request, reader, conn)
+		return s.handlePlainHTTP(ctx, request, reader, conn, dest, dispatcher)
 	}
 }
 
-func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer) error {
+func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest v2net.Destination, dispatcher dispatcher.Interface) error {
 	response := &http.Response{
 		Status:        "200 OK",
 		StatusCode:    200,
@@ -140,7 +129,10 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 		timeout = time.Minute * 2
 	}
 	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
-	ray := s.packetDispatcher.DispatchToOutbound(ctx)
+	ray, err := dispatcher.Dispatch(ctx, dest)
+	if err != nil {
+		return err
+	}
 
 	requestDone := signal.ExecuteAsync(func() error {
 		defer ray.InboundInput().Close()
@@ -213,7 +205,7 @@ func generateResponse(statusCode int, status string) *http.Response {
 	}
 }
 
-func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer) error {
+func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest v2net.Destination, dispatcher dispatcher.Interface) error {
 	if len(request.URL.Host) <= 0 {
 		response := generateResponse(400, "Bad Request")
 		return response.Write(writer)
@@ -222,7 +214,10 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, rea
 	request.Host = request.URL.Host
 	StripHopByHopHeaders(request)
 
-	ray := s.packetDispatcher.DispatchToOutbound(ctx)
+	ray, err := dispatcher.Dispatch(ctx, dest)
+	if err != nil {
+		return err
+	}
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 	defer input.Close()

+ 3 - 2
proxy/proxy.go

@@ -4,6 +4,7 @@ package proxy
 import (
 	"context"
 
+	"v2ray.com/core/app/dispatcher"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/ray"
@@ -13,12 +14,12 @@ import (
 type Inbound interface {
 	Network() net.NetworkList
 
-	Process(context.Context, net.Network, internet.Connection) error
+	Process(context.Context, net.Network, internet.Connection, dispatcher.Interface) error
 }
 
 // An Outbound process outbound connections.
 type Outbound interface {
-	Process(context.Context, ray.OutboundRay) error
+	Process(context.Context, ray.OutboundRay, Dialer) error
 }
 
 // Dialer is used by OutboundHandler for creating outbound connections.

+ 2 - 3
proxy/shadowsocks/client.go

@@ -7,10 +7,10 @@ import (
 
 	"runtime"
 
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -39,14 +39,13 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
 }
 
 // Process implements OutboundHandler.Process().
-func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay) error {
+func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
 	destination := proxy.DestinationFromContext(ctx)
 	network := destination.Network
 
 	var server *protocol.ServerSpec
 	var conn internet.Connection
 
-	dialer := proxy.DialerFromContext(ctx)
 	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		server = v.serverPicker.PickServer()
 		dest := server.Destination()

+ 16 - 24
proxy/shadowsocks/server.go

@@ -2,17 +2,16 @@ package shadowsocks
 
 import (
 	"context"
-	"time"
-
 	"runtime"
+	"time"
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/signal"
@@ -22,11 +21,9 @@ import (
 )
 
 type Server struct {
-	packetDispatcher dispatcher.Interface
-	config           *ServerConfig
-	user             *protocol.User
-	account          *ShadowsocksAccount
-	udpServer        *udp.Dispatcher
+	config  *ServerConfig
+	user    *protocol.User
+	account *ShadowsocksAccount
 }
 
 func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
@@ -50,14 +47,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 		account: account,
 	}
 
-	space.OnInitialize(func() error {
-		s.packetDispatcher = dispatcher.FromSpace(space)
-		if s.packetDispatcher == nil {
-			return errors.New("Shadowsocks|Server: Dispatcher is not found in space.")
-		}
-		return nil
-	})
-
 	return s, nil
 }
 
@@ -71,20 +60,21 @@ func (s *Server) Network() net.NetworkList {
 	return list
 }
 
-func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error {
+func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	conn.SetReusable(false)
 
 	switch network {
 	case net.Network_TCP:
-		return s.handleConnection(ctx, conn)
+		return s.handleConnection(ctx, conn, dispatcher)
 	case net.Network_UDP:
-		return s.handlerUDPPayload(ctx, conn)
+		return s.handlerUDPPayload(ctx, conn, dispatcher)
 	default:
 		return errors.New("Shadowsocks|Server: Unknown network: ", network)
 	}
 }
 
-func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection) error {
+func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
+	udpServer := udp.NewDispatcher(dispatcher)
 	source := proxy.SourceFromContext(ctx)
 
 	reader := buf.NewReader(conn)
@@ -119,7 +109,7 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
 		log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
 
 		ctx = protocol.ContextWithUser(ctx, request.User)
-		v.udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) {
+		udpServer.Dispatch(ctx, dest, data, func(payload *buf.Buffer) {
 			defer payload.Release()
 
 			data, err := EncodeUDPPacket(request, payload)
@@ -136,7 +126,7 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
 	return nil
 }
 
-func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error {
+func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
 	bufferedReader := bufio.NewReader(conn)
 	request, bodyReader, err := ReadTCPSession(s.user, bufferedReader)
@@ -153,13 +143,15 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection)
 	log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
 	log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
 
-	ctx = proxy.ContextWithDestination(ctx, dest)
 	ctx = protocol.ContextWithUser(ctx, request.User)
 
 	ctx, cancel := context.WithCancel(ctx)
 	userSettings := s.user.GetSettings()
 	timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout)
-	ray := s.packetDispatcher.DispatchToOutbound(ctx)
+	ray, err := dispatcher.Dispatch(ctx, dest)
+	if err != nil {
+		return err
+	}
 
 	requestDone := signal.ExecuteAsync(func() error {
 		bufferedWriter := bufio.NewWriter(conn)

+ 2 - 3
proxy/socks/client.go

@@ -6,9 +6,9 @@ import (
 	"runtime"
 	"time"
 
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -34,13 +34,12 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
 	return client, nil
 }
 
-func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
+func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.Dialer) error {
 	destination := proxy.DestinationFromContext(ctx)
 
 	var server *protocol.ServerSpec
 	var conn internet.Connection
 
-	dialer := proxy.DialerFromContext(ctx)
 	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		server = c.serverPicker.PickServer()
 		dest := server.Destination()

+ 17 - 22
proxy/socks/server.go

@@ -8,11 +8,11 @@ import (
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/signal"
@@ -23,9 +23,7 @@ import (
 
 // Server is a SOCKS 5 proxy server
 type Server struct {
-	packetDispatcher dispatcher.Interface
-	config           *ServerConfig
-	udpServer        *udp.Dispatcher
+	config *ServerConfig
 }
 
 // NewServer creates a new Server object.
@@ -37,14 +35,6 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 	s := &Server{
 		config: config,
 	}
-	space.OnInitialize(func() error {
-		s.packetDispatcher = dispatcher.FromSpace(space)
-		if s.packetDispatcher == nil {
-			return errors.New("Socks|Server: Dispatcher is not found in the space.")
-		}
-		s.udpServer = udp.NewDispatcher(s.packetDispatcher)
-		return nil
-	})
 	return s, nil
 }
 
@@ -58,20 +48,20 @@ func (s *Server) Network() net.NetworkList {
 	return list
 }
 
-func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection) error {
+func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	conn.SetReusable(false)
 
 	switch network {
 	case net.Network_TCP:
-		return s.processTCP(ctx, conn)
+		return s.processTCP(ctx, conn, dispatcher)
 	case net.Network_UDP:
-		return s.handleUDPPayload(ctx, conn)
+		return s.handleUDPPayload(ctx, conn, dispatcher)
 	default:
 		return errors.New("Socks|Server: Unknown network: ", network)
 	}
 }
 
-func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error {
+func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
 	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
 	reader := bufio.NewReader(conn)
 
@@ -95,8 +85,7 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error
 		log.Info("Socks|Server: TCP Connect request to ", dest)
 		log.Access(source, dest, log.AccessAccepted, "")
 
-		ctx = proxy.ContextWithDestination(ctx, dest)
-		return s.transport(ctx, reader, conn)
+		return s.transport(ctx, reader, conn, dest, dispatcher)
 	}
 
 	if request.Command == protocol.RequestCommandUDP {
@@ -115,7 +104,7 @@ func (*Server) handleUDP() error {
 	return nil
 }
 
-func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer) error {
+func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error {
 	ctx, cancel := context.WithCancel(ctx)
 	timeout := time.Second * time.Duration(v.config.Timeout)
 	if timeout == 0 {
@@ -123,7 +112,11 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 	}
 	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
 
-	ray := v.packetDispatcher.DispatchToOutbound(ctx)
+	ray, err := dispatcher.Dispatch(ctx, dest)
+	if err != nil {
+		return err
+	}
+
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 
@@ -159,7 +152,9 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 	return nil
 }
 
-func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection) error {
+func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher dispatcher.Interface) error {
+	udpServer := udp.NewDispatcher(dispatcher)
+
 	source := proxy.SourceFromContext(ctx)
 	log.Info("Socks|Server: Client UDP connection from ", source)
 
@@ -185,7 +180,7 @@ func (v *Server) handleUDPPayload(ctx context.Context, conn internet.Connection)
 
 		dataBuf := buf.NewSmall()
 		dataBuf.Append(data)
-		v.udpServer.Dispatch(ctx, request.Destination(), dataBuf, func(payload *buf.Buffer) {
+		udpServer.Dispatch(ctx, request.Destination(), dataBuf, func(payload *buf.Buffer) {
 			defer payload.Release()
 
 			log.Info("Socks|Server: Writing back UDP response with ", payload.Len(), " bytes")

+ 6 - 14
proxy/vmess/inbound/inbound.go

@@ -10,18 +10,17 @@ import (
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/serial"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/uuid"
-	"v2ray.com/core/proxy"
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/transport/internet"
@@ -75,8 +74,6 @@ func (v *userByEmail) Get(email string) (*protocol.User, bool) {
 
 // Inbound connection handler that handles messages in VMess format.
 type VMessInboundHandler struct {
-	sync.RWMutex
-	packetDispatcher      dispatcher.Interface
 	inboundHandlerManager proxyman.InboundHandlerManager
 	clients               protocol.UserValidator
 	usersByEmail          *userByEmail
@@ -101,10 +98,6 @@ func New(ctx context.Context, config *Config) (*VMessInboundHandler, error) {
 	}
 
 	space.OnInitialize(func() error {
-		handler.packetDispatcher = dispatcher.FromSpace(space)
-		if handler.packetDispatcher == nil {
-			return errors.New("VMess|Inbound: Dispatcher is not found in space.")
-		}
 		handler.inboundHandlerManager = proxyman.InboundHandlerManagerFromSpace(space)
 		if handler.inboundHandlerManager == nil {
 			return errors.New("VMess|Inbound: InboundHandlerManager is not found is space.")
@@ -122,9 +115,6 @@ func (*VMessInboundHandler) Network() net.NetworkList {
 }
 
 func (v *VMessInboundHandler) GetUser(email string) *protocol.User {
-	v.RLock()
-	defer v.RUnlock()
-
 	user, existing := v.usersByEmail.Get(email)
 	if !existing {
 		v.clients.Add(user)
@@ -177,7 +167,7 @@ func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSessi
 	return nil
 }
 
-func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection) error {
+func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher dispatcher.Interface) error {
 	connection.SetReadDeadline(time.Now().Add(time.Second * 8))
 	reader := bufio.NewReader(connection)
 
@@ -200,11 +190,13 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network,
 	connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
 	userSettings := request.User.GetSettings()
 
-	ctx = proxy.ContextWithDestination(ctx, request.Destination())
 	ctx = protocol.ContextWithUser(ctx, request.User)
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout)
-	ray := v.packetDispatcher.DispatchToOutbound(ctx)
+	ray, err := dispatcher.Dispatch(ctx, request.Destination())
+	if err != nil {
+		return err
+	}
 
 	input := ray.InboundInput()
 	output := ray.InboundOutput()

+ 2 - 3
proxy/vmess/outbound/outbound.go

@@ -6,11 +6,11 @@ import (
 	"time"
 
 	"v2ray.com/core/app"
+	"v2ray.com/core/app/log"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/bufio"
 	"v2ray.com/core/common/errors"
-	"v2ray.com/core/app/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
@@ -47,11 +47,10 @@ func New(ctx context.Context, config *Config) (*VMessOutboundHandler, error) {
 }
 
 // Dispatch implements OutboundHandler.Dispatch().
-func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.OutboundRay) error {
+func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.OutboundRay, dialer proxy.Dialer) error {
 	var rec *protocol.ServerSpec
 	var conn internet.Connection
 
-	dialer := proxy.DialerFromContext(ctx)
 	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		rec = v.serverPicker.PickServer()
 		rawConn, err := dialer.Dial(ctx, rec.Destination())

+ 8 - 9
transport/internet/udp/dispatcher.go

@@ -5,10 +5,9 @@ import (
 	"sync"
 
 	"v2ray.com/core/app/dispatcher"
-	"v2ray.com/core/common/buf"
 	"v2ray.com/core/app/log"
+	"v2ray.com/core/common/buf"
 	v2net "v2ray.com/core/common/net"
-	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/ray"
 )
 
@@ -16,14 +15,14 @@ type ResponseCallback func(payload *buf.Buffer)
 
 type Dispatcher struct {
 	sync.RWMutex
-	conns            map[string]ray.InboundRay
-	packetDispatcher dispatcher.Interface
+	conns      map[string]ray.InboundRay
+	dispatcher dispatcher.Interface
 }
 
-func NewDispatcher(packetDispatcher dispatcher.Interface) *Dispatcher {
+func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {
 	return &Dispatcher{
-		conns:            make(map[string]ray.InboundRay),
-		packetDispatcher: packetDispatcher,
+		conns:      make(map[string]ray.InboundRay),
+		dispatcher: dispatcher,
 	}
 }
 
@@ -47,8 +46,8 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination)
 	}
 
 	log.Info("UDP|Server: establishing new connection for ", dest)
-	ctx = proxy.ContextWithDestination(ctx, dest)
-	return v.packetDispatcher.DispatchToOutbound(ctx), false
+	inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
+	return inboundRay, false
 }
 
 func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {