| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 | package inboundimport (	"context"	"v2ray.com/core"	"v2ray.com/core/app/proxyman"	"v2ray.com/core/app/proxyman/mux"	"v2ray.com/core/common"	"v2ray.com/core/common/dice"	"v2ray.com/core/common/net"	"v2ray.com/core/common/serial"	"v2ray.com/core/proxy"	"v2ray.com/core/transport/internet")func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {	var uplinkCounter core.StatCounter	var downlinkCounter core.StatCounter	policy := v.PolicyManager()	stats := v.Stats()	if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {		name := "inbound>>>" + tag + ">>>traffic>>>uplink"		c, _ := core.GetOrRegisterStatCounter(stats, name)		if c != nil {			uplinkCounter = c		}	}	if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {		name := "inbound>>>" + tag + ">>>traffic>>>downlink"		c, _ := core.GetOrRegisterStatCounter(stats, name)		if c != nil {			downlinkCounter = c		}	}	return uplinkCounter, downlinkCounter}type AlwaysOnInboundHandler struct {	proxy   proxy.Inbound	workers []worker	mux     *mux.Server	tag     string}func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {	rawProxy, err := common.CreateObject(ctx, proxyConfig)	if err != nil {		return nil, err	}	p, ok := rawProxy.(proxy.Inbound)	if !ok {		return nil, newError("not an inbound proxy.")	}	h := &AlwaysOnInboundHandler{		proxy: p,		mux:   mux.NewServer(ctx),		tag:   tag,	}	uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)	nl := p.Network()	pr := receiverConfig.PortRange	address := receiverConfig.Listen.AsAddress()	if address == nil {		address = net.AnyIP	}	for port := pr.From; port <= pr.To; port++ {		if nl.HasNetwork(net.Network_TCP) {			newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()			mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)			if err != nil {				return nil, newError("failed to parse stream config").Base(err).AtWarning()			}			worker := &tcpWorker{				address:         address,				port:            net.Port(port),				proxy:           p,				stream:          mss,				recvOrigDest:    receiverConfig.ReceiveOriginalDestination,				tag:             tag,				dispatcher:      h.mux,				sniffingConfig:  receiverConfig.GetEffectiveSniffingSettings(),				uplinkCounter:   uplinkCounter,				downlinkCounter: downlinkCounter,			}			h.workers = append(h.workers, worker)		}		if nl.HasNetwork(net.Network_UDP) {			worker := &udpWorker{				tag:             tag,				proxy:           p,				address:         address,				port:            net.Port(port),				recvOrigDest:    receiverConfig.ReceiveOriginalDestination,				dispatcher:      h.mux,				uplinkCounter:   uplinkCounter,				downlinkCounter: downlinkCounter,			}			h.workers = append(h.workers, worker)		}	}	return h, nil}// Start implements common.Runnable.func (h *AlwaysOnInboundHandler) Start() error {	for _, worker := range h.workers {		if err := worker.Start(); err != nil {			return err		}	}	return nil}// Close implements common.Closable.func (h *AlwaysOnInboundHandler) Close() error {	var errors []interface{}	for _, worker := range h.workers {		if err := worker.Close(); err != nil {			errors = append(errors, err)		}	}	if err := h.mux.Close(); err != nil {		errors = append(errors, err)	}	if len(errors) > 0 {		return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))	}	return nil}func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {	if len(h.workers) == 0 {		return nil, 0, 0	}	w := h.workers[dice.Roll(len(h.workers))]	return w.Proxy(), w.Port(), 9999}func (h *AlwaysOnInboundHandler) Tag() string {	return h.tag}func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {	return h.proxy}
 |