| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 | package udpimport (	"context"	"net"	"v2ray.com/core/app/log"	"v2ray.com/core/common/buf"	"v2ray.com/core/common/dice"	"v2ray.com/core/common/errors"	v2net "v2ray.com/core/common/net"	"v2ray.com/core/transport/internet/internal")// Payload represents a single UDP payload.type Payload struct {	payload      *buf.Buffer	source       v2net.Destination	originalDest v2net.Destination}// PayloadHandler is function to handle Payload.type PayloadHandler func(payload *buf.Buffer, source v2net.Destination, originalDest v2net.Destination)// PayloadQueue is a queue of Payload.type PayloadQueue struct {	queue    []chan Payload	callback PayloadHandler}// NewPayloadQueue returns a new PayloadQueue.func NewPayloadQueue(option ListenOption) *PayloadQueue {	queue := &PayloadQueue{		callback: option.Callback,		queue:    make([]chan Payload, option.Concurrency),	}	for i := range queue.queue {		queue.queue[i] = make(chan Payload, 64)		go queue.Dequeue(queue.queue[i])	}	return queue}func (v *PayloadQueue) Enqueue(payload Payload) {	size := len(v.queue)	idx := 0	if size > 1 {		idx = dice.Roll(size)	}	for i := 0; i < size; i++ {		select {		case v.queue[idx%size] <- payload:			return		default:			idx++		}	}}func (v *PayloadQueue) Dequeue(queue <-chan Payload) {	for payload := range queue {		v.callback(payload.payload, payload.source, payload.originalDest)	}}func (v *PayloadQueue) Close() {	for _, queue := range v.queue {		close(queue)	}}type ListenOption struct {	Callback            PayloadHandler	ReceiveOriginalDest bool	Concurrency         int}type Hub struct {	conn   *net.UDPConn	cancel context.CancelFunc	queue  *PayloadQueue	option ListenOption}func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*Hub, error) {	if option.Concurrency < 1 {		option.Concurrency = 1	}	udpConn, err := net.ListenUDP("udp", &net.UDPAddr{		IP:   address.IP(),		Port: int(port),	})	if err != nil {		return nil, err	}	log.Info("UDP|Hub: Listening on ", address, ":", port)	if option.ReceiveOriginalDest {		fd, err := internal.GetSysFd(udpConn)		if err != nil {			return nil, errors.Base(err).Message("UDP|Listener: Failed to get fd.")		}		err = SetOriginalDestOptions(fd)		if err != nil {			return nil, errors.Base(err).Message("UDP|Listener: Failed to set socket options.")		}	}	ctx, cancel := context.WithCancel(context.Background())	hub := &Hub{		conn:   udpConn,		queue:  NewPayloadQueue(option),		option: option,		cancel: cancel,	}	go hub.start(ctx)	return hub, nil}func (v *Hub) Close() {	v.cancel()	v.conn.Close()}func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {	return v.conn.WriteToUDP(payload, &net.UDPAddr{		IP:   dest.Address.IP(),		Port: int(dest.Port),	})}func (v *Hub) start(ctx context.Context) {	oobBytes := make([]byte, 256)L:	for {		select {		case <-ctx.Done():			break L		default:		}		buffer := buf.NewSmall()		var noob int		var addr *net.UDPAddr		err := buffer.AppendSupplier(func(b []byte) (int, error) {			n, nb, _, a, e := ReadUDPMsg(v.conn, b, oobBytes)			noob = nb			addr = a			return n, e		})		if err != nil {			log.Info("UDP|Hub: Failed to read UDP msg: ", err)			buffer.Release()			continue		}		payload := Payload{			payload: buffer,		}		payload.source = v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port))		if v.option.ReceiveOriginalDest && noob > 0 {			payload.originalDest = RetrieveOriginalDest(oobBytes[:noob])		}		v.queue.Enqueue(payload)	}	v.queue.Close()}// Connection returns the net.Conn underneath this hub.// Private: Visible for testing onlyfunc (v *Hub) Connection() net.Conn {	return v.conn}func (v *Hub) Addr() net.Addr {	return v.conn.LocalAddr()}
 |