| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 | package udpimport (	"context"	"sync"	"time"	"v2ray.com/core"	"v2ray.com/core/common"	"v2ray.com/core/common/buf"	"v2ray.com/core/common/net"	"v2ray.com/core/common/signal")type ResponseCallback func(payload *buf.Buffer)type connEntry struct {	link   *core.Link	timer  signal.ActivityUpdater	cancel context.CancelFunc}type Dispatcher struct {	sync.RWMutex	conns      map[net.Destination]*connEntry	dispatcher core.Dispatcher}func NewDispatcher(dispatcher core.Dispatcher) *Dispatcher {	return &Dispatcher{		conns:      make(map[net.Destination]*connEntry),		dispatcher: dispatcher,	}}func (v *Dispatcher) RemoveRay(dest net.Destination) {	v.Lock()	defer v.Unlock()	if conn, found := v.conns[dest]; found {		common.Close(conn.link.Reader)		common.Close(conn.link.Writer)		delete(v.conns, dest)	}}func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallback) *connEntry {	v.Lock()	defer v.Unlock()	if entry, found := v.conns[dest]; found {		return entry	}	newError("establishing new connection for ", dest).WriteToLog()	ctx, cancel := context.WithCancel(context.Background())	removeRay := func() {		cancel()		v.RemoveRay(dest)	}	timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)	link, _ := v.dispatcher.Dispatch(ctx, dest)	entry := &connEntry{		link:   link,		timer:  timer,		cancel: removeRay,	}	v.conns[dest] = entry	go handleInput(ctx, entry, callback)	return entry}func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) {	// TODO: Add user to destString	newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog()	conn := v.getInboundRay(destination, callback)	outputStream := conn.link.Writer	if outputStream != nil {		if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {			newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog()			conn.cancel()			return		}	}}func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {	input := conn.link.Reader	timer := conn.timer	for {		select {		case <-ctx.Done():			return		default:		}		mb, err := input.ReadMultiBuffer()		if err != nil {			newError("failed to handle UDP input").Base(err).WithContext(ctx).WriteToLog()			conn.cancel()			return		}		timer.Update()		for _, b := range mb {			callback(b)		}	}}
 |