浏览代码

apply mux in client

Darien Raymond 8 年之前
父节点
当前提交
5686566139
共有 3 个文件被更改,包括 101 次插入13 次删除
  1. 54 2
      app/proxyman/mux/mux.go
  2. 29 11
      app/proxyman/outbound/handler.go
  3. 18 0
      tools/conf/v2ray.go

+ 54 - 2
app/proxyman/mux/mux.go

@@ -57,6 +57,52 @@ func (s *session) closeDownlink() {
 	s.checkAndRemove()
 }
 
+type ClientManager struct {
+	access  sync.Mutex
+	clients []*Client
+	proxy   proxy.Outbound
+	dialer  proxy.Dialer
+}
+
+func NewClientManager(p proxy.Outbound, d proxy.Dialer) *ClientManager {
+	return &ClientManager{
+		proxy:  p,
+		dialer: d,
+	}
+}
+
+func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) error {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	for _, client := range m.clients {
+		if client.Dispatch(ctx, outboundRay) {
+			return nil
+		}
+	}
+
+	client, err := NewClient(m.proxy, m.dialer, m)
+	if err != nil {
+		return err
+	}
+	m.clients = append(m.clients, client)
+	client.Dispatch(ctx, outboundRay)
+	return nil
+}
+
+func (m *ClientManager) onClientFinish() {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	nActive := 0
+	for idx, client := range m.clients {
+		if nActive != idx && !client.Closed() {
+			m.clients[nActive] = client
+		}
+	}
+	m.clients = m.clients[:nActive]
+}
+
 type Client struct {
 	access     sync.RWMutex
 	count      uint16
@@ -64,11 +110,12 @@ type Client struct {
 	inboundRay ray.InboundRay
 	ctx        context.Context
 	cancel     context.CancelFunc
+	manager    *ClientManager
 }
 
 var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))
 
-func NewClient(p proxy.Outbound, dialer proxy.Dialer) (*Client, error) {
+func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) {
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)
 	pipe := ray.NewRay(ctx)
@@ -82,6 +129,7 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer) (*Client, error) {
 		inboundRay: pipe,
 		ctx:        ctx,
 		cancel:     cancel,
+		manager:    m,
 	}, nil
 }
 
@@ -101,6 +149,7 @@ func (m *Client) remove(id uint16) {
 	if len(m.sessions) == 0 {
 		m.cancel()
 		m.inboundRay.InboundInput().Close()
+		go m.manager.onClientFinish()
 	}
 }
 
@@ -121,7 +170,10 @@ func (m *Client) fetchInput(ctx context.Context, s *session) {
 		writer: m.inboundRay.InboundInput(),
 	}
 	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
-	buf.PipeUntilEOF(timer, s.input, writer)
+	if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil {
+		log.Info("Proxyman|Mux|Client: Failed to fetch all input: ", err)
+	}
+
 	writer.Close()
 	s.closeUplink()
 }

+ 29 - 11
app/proxyman/outbound/handler.go

@@ -9,6 +9,7 @@ import (
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
+	"v2ray.com/core/app/proxyman/mux"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
 	v2net "v2ray.com/core/common/net"
@@ -22,6 +23,7 @@ type Handler struct {
 	senderSettings  *proxyman.SenderConfig
 	proxy           proxy.Outbound
 	outboundManager proxyman.OutboundHandlerManager
+	mux             *mux.ClientManager
 }
 
 func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*Handler, error) {
@@ -54,6 +56,10 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
 		}
 	}
 
+	if h.senderSettings != nil && h.senderSettings.MultiplexSettings != nil && h.senderSettings.MultiplexSettings.Enabled {
+		h.mux = mux.NewClientManager(h.proxy, h)
+	}
+
 	proxyHandler, err := config.GetProxyHandler(ctx)
 	if err != nil {
 		return nil, err
@@ -64,20 +70,32 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
 }
 
 func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
-	err := h.proxy.Process(ctx, outboundRay, h)
-	// Ensure outbound ray is properly closed.
-	if err != nil && errors.Cause(err) != io.EOF {
-		err = errors.Base(err).Message("Proxyman|OutboundHandler: Failed to process outbound traffic.")
-		if errors.IsActionRequired(err) {
-			log.Warning(err)
-		} else {
-			log.Info(err)
+	if h.mux != nil {
+		err := h.mux.Dispatch(ctx, outboundRay)
+		if err != nil {
+			err = errors.Base(err).Message("Proxyman|OutboundHandler: Failed to process outbound traffic.")
+			if errors.IsActionRequired(err) {
+				log.Warning(err)
+			} else {
+				log.Info(err)
+			}
 		}
-		outboundRay.OutboundOutput().CloseError()
 	} else {
-		outboundRay.OutboundOutput().Close()
+		err := h.proxy.Process(ctx, outboundRay, h)
+		// Ensure outbound ray is properly closed.
+		if err != nil && errors.Cause(err) != io.EOF {
+			err = errors.Base(err).Message("Proxyman|OutboundHandler: Failed to process outbound traffic.")
+			if errors.IsActionRequired(err) {
+				log.Warning(err)
+			} else {
+				log.Info(err)
+			}
+			outboundRay.OutboundOutput().CloseError()
+		} else {
+			outboundRay.OutboundOutput().Close()
+		}
+		outboundRay.OutboundInput().CloseError()
 	}
-	outboundRay.OutboundInput().CloseError()
 }
 
 // Dial implements proxy.Dialer.Dial().

+ 18 - 0
tools/conf/v2ray.go

@@ -80,6 +80,10 @@ func (v *InboundConnectionConfig) Build() (*proxyman.InboundHandlerConfig, error
 	}, nil
 }
 
+type MuxConfig struct {
+	Enabled bool `json:"enabled"`
+}
+
 type OutboundConnectionConfig struct {
 	Protocol      string          `json:"protocol"`
 	SendThrough   *Address        `json:"sendThrough"`
@@ -87,6 +91,7 @@ type OutboundConnectionConfig struct {
 	ProxySettings *ProxyConfig    `json:"proxySettings"`
 	Settings      json.RawMessage `json:"settings"`
 	Tag           string          `json:"tag"`
+	MuxSettings   *MuxConfig      `json:"mux"`
 }
 
 func (v *OutboundConnectionConfig) Build() (*proxyman.OutboundHandlerConfig, error) {
@@ -114,6 +119,12 @@ func (v *OutboundConnectionConfig) Build() (*proxyman.OutboundHandlerConfig, err
 		senderSettings.ProxySettings = ps
 	}
 
+	if v.MuxSettings != nil && v.MuxSettings.Enabled {
+		senderSettings.MultiplexSettings = &proxyman.MultiplexingConfig{
+			Enabled: true,
+		}
+	}
+
 	rawConfig, err := outboundConfigLoader.LoadWithID(v.Settings, v.Protocol)
 	if err != nil {
 		return nil, errors.Base(err).Message("Config: Failed to parse outbound config.")
@@ -228,6 +239,7 @@ type OutboundDetourConfig struct {
 	Settings      json.RawMessage `json:"settings"`
 	StreamSetting *StreamConfig   `json:"streamSettings"`
 	ProxySettings *ProxyConfig    `json:"proxySettings"`
+	MuxSettings   *MuxConfig      `json:"mux"`
 }
 
 func (v *OutboundDetourConfig) Build() (*proxyman.OutboundHandlerConfig, error) {
@@ -257,6 +269,12 @@ func (v *OutboundDetourConfig) Build() (*proxyman.OutboundHandlerConfig, error)
 		senderSettings.ProxySettings = ps
 	}
 
+	if v.MuxSettings != nil && v.MuxSettings.Enabled {
+		senderSettings.MultiplexSettings = &proxyman.MultiplexingConfig{
+			Enabled: true,
+		}
+	}
+
 	rawConfig, err := outboundConfigLoader.LoadWithID(v.Settings, v.Protocol)
 	if err != nil {
 		return nil, errors.Base(err).Message("Config: Failed to parse to outbound detour config.")