| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 | package muximport (	"context"	"sync"	"time"	"v2ray.com/core/app/dispatcher"	"v2ray.com/core/app/log"	"v2ray.com/core/common/buf"	"v2ray.com/core/common/net"	"v2ray.com/core/common/signal"	"v2ray.com/core/proxy"	"v2ray.com/core/transport/ray")const (	maxParallel = 8	maxTotal    = 128)type manager interface {	remove(id uint16)}type session struct {	sync.Mutex	input          ray.InputStream	output         ray.OutputStream	parent         manager	id             uint16	uplinkClosed   bool	downlinkClosed bool}func (s *session) checkAndRemove() {	s.Lock()	if s.uplinkClosed && s.downlinkClosed {		s.parent.remove(s.id)	}	s.Unlock()}func (s *session) closeUplink() {	s.Lock()	s.uplinkClosed = true	s.Unlock()	s.checkAndRemove()}func (s *session) closeDownlink() {	s.Lock()	s.downlinkClosed = true	s.Unlock()	s.checkAndRemove()}type Client struct {	access     sync.RWMutex	count      uint16	sessions   map[uint16]*session	inboundRay ray.InboundRay	ctx        context.Context	cancel     context.CancelFunc}var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))func NewClient(p proxy.Outbound, dialer proxy.Dialer) (*Client, error) {	ctx, cancel := context.WithCancel(context.Background())	ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)	pipe := ray.NewRay(ctx)	err := p.Process(ctx, pipe, dialer)	if err != nil {		cancel()		return nil, err	}	return &Client{		sessions:   make(map[uint16]*session, 256),		inboundRay: pipe,		ctx:        ctx,		cancel:     cancel,	}, nil}func (m *Client) isFullyOccupied() bool {	m.access.RLock()	defer m.access.RUnlock()	return len(m.sessions) >= maxParallel}func (m *Client) remove(id uint16) {	m.access.Lock()	defer m.access.Unlock()	delete(m.sessions, id)	if len(m.sessions) == 0 {		m.cancel()		m.inboundRay.InboundInput().Close()	}}func (m *Client) Closed() bool {	select {	case <-m.ctx.Done():		return true	default:		return false	}}func (m *Client) fetchInput(ctx context.Context, s *session) {	dest, _ := proxy.TargetFromContext(ctx)	writer := &Writer{		dest:   dest,		id:     s.id,		writer: m.inboundRay.InboundInput(),	}	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)	buf.PipeUntilEOF(timer, s.input, writer)	writer.Close()	s.closeUplink()}func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {	m.access.Lock()	defer m.access.Unlock()	if len(m.sessions) >= maxParallel {		return false	}	if m.count >= maxTotal {		return false	}	select {	case <-m.ctx.Done():		return false	default:	}	m.count++	id := m.count	s := &session{		input:  outboundRay.OutboundInput(),		output: outboundRay.OutboundOutput(),		parent: m,		id:     id,	}	m.sessions[id] = s	go m.fetchInput(ctx, s)	return true}func (m *Client) fetchOutput() {	reader := NewReader(m.inboundRay.InboundOutput())	for {		meta, err := reader.ReadMetadata()		if err != nil {			break		}		m.access.RLock()		s, found := m.sessions[meta.SessionID]		m.access.RUnlock()		if found && meta.SessionStatus == SessionStatusEnd {			s.closeDownlink()			s.output.Close()		}		if !meta.Option.Has(OptionData) {			continue		}		for {			data, more, err := reader.Read()			if err != nil {				break			}			if found {				if err := s.output.Write(data); err != nil {					break				}			}			if !more {				break			}		}	}}type Server struct {	dispatcher dispatcher.Interface}func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {	if dest != muxCoolDestination {		return s.dispatcher.Dispatch(ctx, dest)	}	ray := ray.NewRay(ctx)	return ray, nil}type ServerWorker struct {	dispatcher  dispatcher.Interface	outboundRay ray.OutboundRay	sessions    map[uint16]*session	access      sync.RWMutex}func (w *ServerWorker) remove(id uint16) {	w.access.Lock()	delete(w.sessions, id)	w.access.Unlock()}func (w *ServerWorker) handle(ctx context.Context, s *session) {	for {		select {		case <-ctx.Done():			return		default:			data, err := s.input.Read()			if err != nil {				return			}			w.outboundRay.OutboundOutput().Write(data)		}	}}func (w *ServerWorker) run(ctx context.Context) {	input := w.outboundRay.OutboundInput()	reader := NewReader(input)	for {		select {		case <-ctx.Done():			return		default:		}		meta, err := reader.ReadMetadata()		if err != nil {			return		}		w.access.RLock()		s, found := w.sessions[meta.SessionID]		w.access.RUnlock()		if found && meta.SessionStatus == SessionStatusEnd {			s.closeUplink()			s.output.Close()		}		if meta.SessionStatus == SessionStatusNew {			inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)			if err != nil {				log.Info("Proxyman|Mux: Failed to dispatch request: ", err)				continue			}			s = &session{				input:  inboundRay.InboundOutput(),				output: inboundRay.InboundInput(),				parent: w,				id:     meta.SessionID,			}			w.access.Lock()			w.sessions[meta.SessionID] = s			w.access.Unlock()			go w.handle(ctx, s)		}		if meta.Option.Has(OptionData) {			for {				data, more, err := reader.Read()				if err != nil {					break				}				if s != nil {					s.output.Write(data)				}				if !more {					break				}			}		}	}}
 |