Browse Source

split worker picker from client manager

Darien Raymond 7 years ago
parent
commit
284923664a
3 changed files with 110 additions and 58 deletions
  1. 10 1
      app/proxyman/outbound/handler.go
  2. 100 53
      common/mux/client.go
  3. 0 4
      common/mux/mux.go

+ 10 - 1
app/proxyman/outbound/handler.go

@@ -72,7 +72,16 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
 		if config.Concurrency < 1 || config.Concurrency > 1024 {
 		if config.Concurrency < 1 || config.Concurrency > 1024 {
 			return nil, newError("invalid mux concurrency: ", config.Concurrency).AtWarning()
 			return nil, newError("invalid mux concurrency: ", config.Concurrency).AtWarning()
 		}
 		}
-		h.mux = mux.NewClientManager(proxyHandler, h, config.Concurrency)
+		h.mux = &mux.ClientManager{
+			Picker: &mux.IncrementalWorkerPicker{
+				New: func() (*mux.ClientWorker, error) {
+					return mux.NewClientWorker(proxyHandler, h, mux.ClientStrategy{
+						MaxConcurrency: config.Concurrency,
+						MaxConnection:  128,
+					})
+				},
+			},
+		}
 	}
 	}
 
 
 	h.proxy = proxyHandler
 	h.proxy = proxyHandler

+ 100 - 53
common/mux/client.go

@@ -13,6 +13,7 @@ import (
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal/done"
 	"v2ray.com/core/common/signal/done"
+	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/vio"
 	"v2ray.com/core/common/vio"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet"
@@ -20,85 +21,126 @@ import (
 )
 )
 
 
 type ClientManager struct {
 type ClientManager struct {
+	Picker WorkerPicker
+}
+
+func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
+	for {
+		worker, err := m.Picker.PickAvailable()
+		if err != nil {
+			return err
+		}
+		if worker.Dispatch(ctx, link) {
+			return nil
+		}
+	}
+}
+
+type WorkerPicker interface {
+	PickAvailable() (*ClientWorker, error)
+}
+
+type IncrementalWorkerPicker struct {
+	New func() (*ClientWorker, error)
+
 	access      sync.Mutex
 	access      sync.Mutex
-	clients     []*Client
-	proxy       proxy.Outbound
-	dialer      internet.Dialer
-	concurrency uint32
+	workers     []*ClientWorker
+	cleanupTask *task.Periodic
 }
 }
 
 
-func NewClientManager(p proxy.Outbound, d internet.Dialer, c uint32) *ClientManager {
-	return &ClientManager{
-		proxy:       p,
-		dialer:      d,
-		concurrency: c,
+func (p *IncrementalWorkerPicker) cleanupFunc() error {
+	p.access.Lock()
+	defer p.access.Unlock()
+
+	if len(p.workers) == 0 {
+		return newError("no worker")
 	}
 	}
+
+	p.cleanup()
+	return nil
 }
 }
 
 
-func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
-	m.access.Lock()
-	defer m.access.Unlock()
+func (p *IncrementalWorkerPicker) cleanup() {
+	var activeWorkers []*ClientWorker
+	for _, w := range p.workers {
+		if !w.Closed() {
+			activeWorkers = append(activeWorkers, w)
+		}
+	}
+	p.workers = activeWorkers
+}
 
 
-	for _, client := range m.clients {
-		if client.Dispatch(ctx, link) {
-			return nil
+func (p *IncrementalWorkerPicker) pickInternal() (*ClientWorker, error, bool) {
+	p.access.Lock()
+	defer p.access.Unlock()
+
+	for _, w := range p.workers {
+		if !w.IsFull() {
+			return w, nil, false
 		}
 		}
 	}
 	}
 
 
-	client, err := NewClient(ctx, m.proxy, m.dialer, m)
+	p.cleanup()
+
+	worker, err := p.New()
 	if err != nil {
 	if err != nil {
-		return newError("failed to create client").Base(err)
+		return nil, err, false
 	}
 	}
-	m.clients = append(m.clients, client)
-	client.Dispatch(ctx, link)
-	return nil
-}
+	p.workers = append(p.workers, worker)
 
 
-func (m *ClientManager) onClientFinish() {
-	m.access.Lock()
-	defer m.access.Unlock()
+	if p.cleanupTask == nil {
+		p.cleanupTask = &task.Periodic{
+			Interval: time.Second * 30,
+			Execute:  p.cleanupFunc,
+		}
+	}
 
 
-	activeClients := make([]*Client, 0, len(m.clients))
+	return worker, nil, true
+}
 
 
-	for _, client := range m.clients {
-		if !client.Closed() {
-			activeClients = append(activeClients, client)
-		}
+func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
+	worker, err, start := p.pickInternal()
+	if start {
+		p.cleanupTask.Start()
 	}
 	}
-	m.clients = activeClients
+
+	return worker, err
 }
 }
 
 
-type Client struct {
+type ClientStrategy struct {
+	MaxConcurrency uint32
+	MaxConnection  uint32
+}
+
+type ClientWorker struct {
 	sessionManager *SessionManager
 	sessionManager *SessionManager
 	link           vio.Link
 	link           vio.Link
 	done           *done.Instance
 	done           *done.Instance
-	manager        *ClientManager
-	concurrency    uint32
+	strategy       ClientStrategy
 }
 }
 
 
 var muxCoolAddress = net.DomainAddress("v1.mux.cool")
 var muxCoolAddress = net.DomainAddress("v1.mux.cool")
 var muxCoolPort = net.Port(9527)
 var muxCoolPort = net.Port(9527)
 
 
-// NewClient creates a new mux.Client.
-func NewClient(pctx context.Context, p proxy.Outbound, dialer internet.Dialer, m *ClientManager) (*Client, error) {
+// NewClientWorker creates a new mux.Client.
+func NewClientWorker(p proxy.Outbound, dialer internet.Dialer, s ClientStrategy) (*ClientWorker, error) {
 	ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
 	ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
 		Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
 		Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
 	})
 	})
 	ctx, cancel := context.WithCancel(ctx)
 	ctx, cancel := context.WithCancel(ctx)
 
 
-	opts := pipe.OptionsFromContext(pctx)
+	opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
 	uplinkReader, upLinkWriter := pipe.New(opts...)
 	uplinkReader, upLinkWriter := pipe.New(opts...)
 	downlinkReader, downlinkWriter := pipe.New(opts...)
 	downlinkReader, downlinkWriter := pipe.New(opts...)
 
 
-	c := &Client{
+	c := &ClientWorker{
 		sessionManager: NewSessionManager(),
 		sessionManager: NewSessionManager(),
 		link: vio.Link{
 		link: vio.Link{
 			Reader: downlinkReader,
 			Reader: downlinkReader,
 			Writer: upLinkWriter,
 			Writer: upLinkWriter,
 		},
 		},
-		done:        done.New(),
-		manager:     m,
-		concurrency: m.concurrency,
+		done:     done.New(),
+		strategy: s,
 	}
 	}
 
 
 	go func() {
 	go func() {
@@ -115,13 +157,11 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer internet.Dialer, m
 }
 }
 
 
 // Closed returns true if this Client is closed.
 // Closed returns true if this Client is closed.
-func (m *Client) Closed() bool {
+func (m *ClientWorker) Closed() bool {
 	return m.done.Done()
 	return m.done.Done()
 }
 }
 
 
-func (m *Client) monitor() {
-	defer m.manager.onClientFinish()
-
+func (m *ClientWorker) monitor() {
 	timer := time.NewTicker(time.Second * 16)
 	timer := time.NewTicker(time.Second * 16)
 	defer timer.Stop()
 	defer timer.Stop()
 
 
@@ -181,16 +221,23 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 	}
 	}
 }
 }
 
 
-func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
+func (m *ClientWorker) IsFull() bool {
 	sm := m.sessionManager
 	sm := m.sessionManager
-	if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
-		return false
+	if m.strategy.MaxConcurrency > 0 && sm.Size() >= int(m.strategy.MaxConcurrency) {
+		return true
+	}
+	if m.strategy.MaxConnection > 0 && sm.Count() >= int(m.strategy.MaxConnection) {
+		return true
 	}
 	}
+	return false
+}
 
 
-	if m.done.Done() {
+func (m *ClientWorker) Dispatch(ctx context.Context, link *vio.Link) bool {
+	if m.IsFull() || m.Closed() {
 		return false
 		return false
 	}
 	}
 
 
+	sm := m.sessionManager
 	s := sm.Allocate()
 	s := sm.Allocate()
 	if s == nil {
 	if s == nil {
 		return false
 		return false
@@ -201,21 +248,21 @@ func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
 	return true
 	return true
 }
 }
 
 
-func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
+func (m *ClientWorker) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
 	if meta.Option.Has(OptionData) {
 		return buf.Copy(NewStreamReader(reader), buf.Discard)
 		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
-func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
+func (m *ClientWorker) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
 	if meta.Option.Has(OptionData) {
 		return buf.Copy(NewStreamReader(reader), buf.Discard)
 		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
-func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
+func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if !meta.Option.Has(OptionData) {
 	if !meta.Option.Has(OptionData) {
 		return nil
 		return nil
 	}
 	}
@@ -239,7 +286,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 	return err
 	return err
 }
 }
 
 
-func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
+func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
 		if meta.Option.Has(OptionError) {
 		if meta.Option.Has(OptionError) {
 			pipe.CloseError(s.input)
 			pipe.CloseError(s.input)
@@ -253,7 +300,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
 	return nil
 	return nil
 }
 }
 
 
-func (m *Client) fetchOutput() {
+func (m *ClientWorker) fetchOutput() {
 	defer func() {
 	defer func() {
 		common.Must(m.done.Close())
 		common.Must(m.done.Close())
 	}()
 	}()

+ 0 - 4
common/mux/mux.go

@@ -1,7 +1,3 @@
 package mux
 package mux
 
 
 //go:generate errorgen
 //go:generate errorgen
-
-const (
-	maxTotal = 128
-)