| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 | package udpimport (	"context"	"sync"	"v2ray.com/core/app/dispatcher"	"v2ray.com/core/app/log"	"v2ray.com/core/common/buf"	v2net "v2ray.com/core/common/net"	"v2ray.com/core/transport/ray")type ResponseCallback func(payload *buf.Buffer)type Dispatcher struct {	sync.RWMutex	conns      map[v2net.Destination]ray.InboundRay	dispatcher dispatcher.Interface}func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {	return &Dispatcher{		conns:      make(map[v2net.Destination]ray.InboundRay),		dispatcher: dispatcher,	}}func (v *Dispatcher) RemoveRay(dest v2net.Destination) {	v.Lock()	defer v.Unlock()	if conn, found := v.conns[dest]; found {		conn.InboundInput().Close()		conn.InboundOutput().Close()		delete(v.conns, dest)	}}func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination) (ray.InboundRay, bool) {	v.Lock()	defer v.Unlock()	if entry, found := v.conns[dest]; found {		return entry, true	}	log.Trace(newError("establishing new connection for ", dest))	inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)	v.conns[dest] = inboundRay	return inboundRay, false}func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {	// TODO: Add user to destString	log.Trace(newError("dispatch request to: ", destination).AtDebug())	inboundRay, existing := v.getInboundRay(ctx, destination)	outputStream := inboundRay.InboundInput()	if outputStream != nil {		if err := outputStream.Write(buf.NewMultiBufferValue(payload)); err != nil {			v.RemoveRay(destination)		}	}	if !existing {		go func() {			handleInput(inboundRay.InboundOutput(), callback)			v.RemoveRay(destination)		}()	}}func handleInput(input ray.InputStream, callback ResponseCallback) {	for {		mb, err := input.Read()		if err != nil {			break		}		for _, b := range mb {			callback(b)		}	}}
 |