Browse Source

DialUDP function

Darien Raymond 6 years ago
parent
commit
b52725cf65

+ 4 - 2
app/dns/udpns.go

@@ -9,9 +9,9 @@ import (
 
 
 	"golang.org/x/net/dns/dnsmessage"
 	"golang.org/x/net/dns/dnsmessage"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common"
-	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol/dns"
 	"v2ray.com/core/common/protocol/dns"
+	udp_proto "v2ray.com/core/common/protocol/udp"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal/pubsub"
 	"v2ray.com/core/common/signal/pubsub"
 	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/task"
@@ -101,7 +101,9 @@ func (s *ClassicNameServer) Cleanup() error {
 	return nil
 	return nil
 }
 }
 
 
-func (s *ClassicNameServer) HandleResponse(ctx context.Context, payload *buf.Buffer) {
+func (s *ClassicNameServer) HandleResponse(ctx context.Context, packet *udp_proto.Packet) {
+	payload := packet.Payload
+
 	var parser dnsmessage.Parser
 	var parser dnsmessage.Parser
 	header, err := parser.Start(payload.Bytes())
 	header, err := parser.Start(payload.Bytes())
 	if err != nil {
 	if err != nil {

+ 1 - 1
app/proxyman/inbound/worker.go

@@ -313,7 +313,7 @@ func (w *udpWorker) removeConn(id connID) {
 func (w *udpWorker) handlePackets() {
 func (w *udpWorker) handlePackets() {
 	receive := w.hub.Receive()
 	receive := w.hub.Receive()
 	for payload := range receive {
 	for payload := range receive {
-		w.callback(payload.Content, payload.Source, payload.OriginalDestination)
+		w.callback(payload.Payload, payload.Source, payload.Target)
 	}
 	}
 }
 }
 
 

+ 15 - 0
functions.go

@@ -7,6 +7,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
+	"v2ray.com/core/transport/internet/udp"
 )
 )
 
 
 // CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil.
 // CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil.
@@ -54,3 +55,17 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err
 	}
 	}
 	return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil
 	return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil
 }
 }
+
+// DialUDP provides a way to exchange UDP packets through V2Ray instance to remote servers.
+// Since it is under a proxy context, the LocalAddr() in returned PacketConn will not show the real address.
+//
+// TODO: SetDeadline() / SetReadDeadline() / SetWriteDeadline() are not implemented.
+//
+// v2ray:api:beta
+func DialUDP(ctx context.Context, v *Instance) (net.PacketConn, error) {
+	dispatcher := v.GetFeature(routing.DispatcherType())
+	if dispatcher == nil {
+		return nil, newError("routing.Dispatcher is not registered in V2Ray core")
+	}
+	return udp.DialDispatcher(ctx, dispatcher.(routing.Dispatcher))
+}

+ 170 - 0
functions_test.go

@@ -0,0 +1,170 @@
+package core_test
+
+import (
+	"context"
+	"crypto/rand"
+	"io"
+	"testing"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/google/go-cmp/cmp"
+
+	"v2ray.com/core"
+	"v2ray.com/core/app/dispatcher"
+	"v2ray.com/core/app/proxyman"
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/serial"
+	"v2ray.com/core/proxy/freedom"
+	"v2ray.com/core/testing/servers/tcp"
+	"v2ray.com/core/testing/servers/udp"
+)
+
+func xor(b []byte) []byte {
+	r := make([]byte, len(b))
+	for i, v := range b {
+		r[i] = v ^ 'c'
+	}
+	return r
+}
+
+func xor2(b []byte) []byte {
+	r := make([]byte, len(b))
+	for i, v := range b {
+		r[i] = v ^ 'd'
+	}
+	return r
+}
+
+func TestV2RayDial(t *testing.T) {
+	tcpServer := tcp.Server{
+		MsgProcessor: xor,
+	}
+	dest, err := tcpServer.Start()
+	common.Must(err)
+	defer tcpServer.Close()
+
+	config := &core.Config{
+		App: []*serial.TypedMessage{
+			serial.ToTypedMessage(&dispatcher.Config{}),
+			serial.ToTypedMessage(&proxyman.InboundConfig{}),
+			serial.ToTypedMessage(&proxyman.OutboundConfig{}),
+		},
+		Outbound: []*core.OutboundHandlerConfig{
+			{
+				ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
+			},
+		},
+	}
+
+	cfgBytes, err := proto.Marshal(config)
+	common.Must(err)
+
+	server, err := core.StartInstance("protobuf", cfgBytes)
+	common.Must(err)
+	defer server.Close()
+
+	conn, err := core.Dial(context.Background(), server, dest)
+	common.Must(err)
+	defer conn.Close()
+
+	const size = 10240 * 1024
+	payload := make([]byte, size)
+	common.Must2(rand.Read(payload))
+
+	if _, err := conn.Write(payload); err != nil {
+		t.Fatal(err)
+	}
+
+	receive := make([]byte, size)
+	if _, err := io.ReadFull(conn, receive); err != nil {
+		t.Fatal("failed to read all response: ", err)
+	}
+
+	if r := cmp.Diff(xor(receive), payload); r != "" {
+		t.Error(r)
+	}
+}
+
+func TestV2RayDialUDP(t *testing.T) {
+	udpServer1 := udp.Server{
+		MsgProcessor: xor,
+	}
+	dest1, err := udpServer1.Start()
+	common.Must(err)
+	defer udpServer1.Close()
+
+	udpServer2 := udp.Server{
+		MsgProcessor: xor2,
+	}
+	dest2, err := udpServer2.Start()
+	common.Must(err)
+	defer udpServer2.Close()
+
+	config := &core.Config{
+		App: []*serial.TypedMessage{
+			serial.ToTypedMessage(&dispatcher.Config{}),
+			serial.ToTypedMessage(&proxyman.InboundConfig{}),
+			serial.ToTypedMessage(&proxyman.OutboundConfig{}),
+		},
+		Outbound: []*core.OutboundHandlerConfig{
+			{
+				ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
+			},
+		},
+	}
+
+	cfgBytes, err := proto.Marshal(config)
+	common.Must(err)
+
+	server, err := core.StartInstance("protobuf", cfgBytes)
+	common.Must(err)
+	defer server.Close()
+
+	conn, err := core.DialUDP(context.Background(), server)
+	common.Must(err)
+	defer conn.Close()
+
+	const size = 1024
+	{
+		payload := make([]byte, size)
+		common.Must2(rand.Read(payload))
+
+		if _, err := conn.WriteTo(payload, &net.UDPAddr{
+			IP:   dest1.Address.IP(),
+			Port: int(dest1.Port),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		receive := make([]byte, size)
+		if _, _, err := conn.ReadFrom(receive); err != nil {
+			t.Fatal(err)
+		}
+
+		if r := cmp.Diff(xor(receive), payload); r != "" {
+			t.Error(r)
+		}
+	}
+
+	{
+		payload := make([]byte, size)
+		common.Must2(rand.Read(payload))
+
+		if _, err := conn.WriteTo(payload, &net.UDPAddr{
+			IP:   dest2.Address.IP(),
+			Port: int(dest2.Port),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		receive := make([]byte, size)
+		if _, _, err := conn.ReadFrom(receive); err != nil {
+			t.Fatal(err)
+		}
+
+		if r := cmp.Diff(xor2(receive), payload); r != "" {
+			t.Error(r)
+		}
+	}
+}

+ 3 - 1
proxy/shadowsocks/server.go

@@ -10,6 +10,7 @@ import (
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/protocol"
+	udp_proto "v2ray.com/core/common/protocol/udp"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/task"
@@ -69,12 +70,13 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
 }
 }
 
 
 func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
 func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
-	udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
+	udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
 		request := protocol.RequestHeaderFromContext(ctx)
 		request := protocol.RequestHeaderFromContext(ctx)
 		if request == nil {
 		if request == nil {
 			return
 			return
 		}
 		}
 
 
+		payload := packet.Payload
 		data, err := EncodeUDPPacket(request, payload.Bytes())
 		data, err := EncodeUDPPacket(request, payload.Bytes())
 		payload.Release()
 		payload.Release()
 		if err != nil {
 		if err != nil {

+ 3 - 1
proxy/socks/server.go

@@ -11,6 +11,7 @@ import (
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/protocol"
+	udp_proto "v2ray.com/core/common/protocol/udp"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/task"
@@ -174,7 +175,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 }
 }
 
 
 func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
 func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
-	udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
+	udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
+		payload := packet.Payload
 		newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))
 		newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))
 
 
 		request := protocol.RequestHeaderFromContext(ctx)
 		request := protocol.RequestHeaderFromContext(ctx)

+ 87 - 4
transport/internet/udp/dispatcher.go

@@ -2,19 +2,23 @@ package udp
 
 
 import (
 import (
 	"context"
 	"context"
+	"io"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"v2ray.com/core/common/signal/done"
+
 	"v2ray.com/core/common"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/protocol/udp"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport"
 )
 )
 
 
-type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
+type ResponseCallback func(ctx context.Context, packet *udp.Packet)
 
 
 type connEntry struct {
 type connEntry struct {
 	link   *transport.Link
 	link   *transport.Link
@@ -70,7 +74,7 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *c
 		cancel: removeRay,
 		cancel: removeRay,
 	}
 	}
 	v.conns[dest] = entry
 	v.conns[dest] = entry
-	go handleInput(ctx, entry, v.callback)
+	go handleInput(ctx, entry, dest, v.callback)
 	return entry
 	return entry
 }
 }
 
 
@@ -89,7 +93,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
 	}
 	}
 }
 }
 
 
-func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
+func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) {
 	defer conn.cancel()
 	defer conn.cancel()
 
 
 	input := conn.link.Reader
 	input := conn.link.Reader
@@ -109,7 +113,86 @@ func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback
 		}
 		}
 		timer.Update()
 		timer.Update()
 		for _, b := range mb {
 		for _, b := range mb {
-			callback(ctx, b)
+			callback(ctx, &udp.Packet{
+				Payload: b,
+				Source:  dest,
+			})
 		}
 		}
 	}
 	}
 }
 }
+
+type dispatcherConn struct {
+	dispatcher *Dispatcher
+	cache      chan *udp.Packet
+	done       *done.Instance
+}
+
+func DialDispatcher(ctx context.Context, dispatcher routing.Dispatcher) (net.PacketConn, error) {
+	c := &dispatcherConn{
+		cache: make(chan *udp.Packet, 16),
+		done:  done.New(),
+	}
+
+	d := NewDispatcher(dispatcher, c.callback)
+	c.dispatcher = d
+	return c, nil
+}
+
+func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) {
+	select {
+	case <-c.done.Wait():
+		packet.Payload.Release()
+		return
+	case c.cache <- packet:
+	default:
+		packet.Payload.Release()
+		return
+	}
+}
+
+func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
+	select {
+	case <-c.done.Wait():
+		return 0, nil, io.EOF
+	case packet := <-c.cache:
+		n := copy(p, packet.Payload.Bytes())
+		return n, &net.UDPAddr{
+			IP:   packet.Source.Address.IP(),
+			Port: int(packet.Source.Port),
+		}, nil
+	}
+}
+
+func (c *dispatcherConn) WriteTo(p []byte, addr net.Addr) (int, error) {
+	buffer := buf.New()
+	raw := buffer.Extend(buf.Size)
+	n := copy(raw, p)
+	buffer.Resize(0, int32(n))
+
+	ctx := context.Background()
+	c.dispatcher.Dispatch(ctx, net.DestinationFromAddr(addr), buffer)
+	return n, nil
+}
+
+func (c *dispatcherConn) Close() error {
+	return c.done.Close()
+}
+
+func (c *dispatcherConn) LocalAddr() net.Addr {
+	return &net.UDPAddr{
+		IP:   []byte{0, 0, 0, 0},
+		Port: 0,
+	}
+}
+
+func (c *dispatcherConn) SetDeadline(t time.Time) error {
+	return nil
+}
+
+func (c *dispatcherConn) SetReadDeadline(t time.Time) error {
+	return nil
+}
+
+func (c *dispatcherConn) SetWriteDeadline(t time.Time) error {
+	return nil
+}

+ 2 - 1
transport/internet/udp/dispatcher_test.go

@@ -8,6 +8,7 @@ import (
 
 
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/protocol/udp"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/features/routing"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport"
 	. "v2ray.com/core/transport/internet/udp"
 	. "v2ray.com/core/transport/internet/udp"
@@ -66,7 +67,7 @@ func TestSameDestinationDispatching(t *testing.T) {
 	b.WriteString("abcd")
 	b.WriteString("abcd")
 
 
 	var msgCount uint32
 	var msgCount uint32
-	dispatcher := NewDispatcher(td, func(ctx context.Context, payload *buf.Buffer) {
+	dispatcher := NewDispatcher(td, func(ctx context.Context, packet *udp.Packet) {
 		atomic.AddUint32(&msgCount, 1)
 		atomic.AddUint32(&msgCount, 1)
 	})
 	})