Browse Source

test case for reverse proxy

Darien Raymond 7 years ago
parent
commit
715ac9d267
5 changed files with 280 additions and 11 deletions
  1. 2 0
      app/reverse/bridge.go
  2. 22 10
      app/reverse/portal.go
  3. 41 1
      app/reverse/reverse.go
  4. 4 0
      common/mux/client.go
  5. 211 0
      testing/scenarios/reverse_test.go

+ 2 - 0
app/reverse/bridge.go

@@ -32,6 +32,8 @@ func NewBridge(config *BridgeConfig, dispatcher routing.Dispatcher) (*Bridge, er
 
 	b := &Bridge{
 		dispatcher: dispatcher,
+		tag:        config.Tag,
+		domain:     config.Domain,
 	}
 	b.monitorTask = &task.Periodic{
 		Execute:  b.monitor,

+ 22 - 10
app/reverse/portal.go

@@ -8,8 +8,8 @@ import (
 	"github.com/golang/protobuf/proto"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
-	"v2ray.com/core/common/dice"
 	"v2ray.com/core/common/mux"
+	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/vio"
@@ -53,6 +53,7 @@ func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
 func (p *Portal) Start() error {
 	return p.ohm.AddHandler(context.Background(), &Outbound{
 		portal: p,
+		tag:    p.tag,
 	})
 }
 
@@ -66,7 +67,7 @@ func (s *Portal) HandleConnection(ctx context.Context, link *vio.Link) error {
 		return newError("outbound metadata not found").AtError()
 	}
 
-	if isInternalDomain(outboundMeta.Target) {
+	if isDomain(outboundMeta.Target, s.domain) {
 		muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{
 			MaxConcurrency: 0,
 			MaxConnection:  256,
@@ -149,17 +150,24 @@ func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
 	p.access.Lock()
 	defer p.access.Unlock()
 
-	n := len(p.workers)
-	if n == 0 {
+	if len(p.workers) == 0 {
 		return nil, newError("empty worker list")
 	}
 
-	idx := dice.Roll(n)
-	for i := 0; i < n; i++ {
-		w := p.workers[(i+idx)%n]
-		if !w.IsFull() {
-			return w.client, nil
+	var minIdx int = -1
+	var minConn uint32 = 9999
+	for i, w := range p.workers {
+		if w.IsFull() {
+			continue
 		}
+		if w.client.ActiveConnections() < minConn {
+			minConn = w.client.ActiveConnections()
+			minIdx = i
+		}
+	}
+
+	if minIdx != -1 {
+		return p.workers[minIdx].client, nil
 	}
 
 	return nil, newError("no mux client worker available")
@@ -184,7 +192,11 @@ func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
 	uplinkReader, uplinkWriter := pipe.New(opt...)
 	downlinkReader, downlinkWriter := pipe.New(opt...)
 
-	f := client.Dispatch(context.Background(), &vio.Link{
+	ctx := context.Background()
+	ctx = session.ContextWithOutbound(ctx, &session.Outbound{
+		Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
+	})
+	f := client.Dispatch(ctx, &vio.Link{
 		Reader: uplinkReader,
 		Writer: downlinkWriter,
 	})

+ 41 - 1
app/reverse/reverse.go

@@ -16,8 +16,12 @@ const (
 	internalDomain = "reverse.internal.v2ray.com"
 )
 
+func isDomain(dest net.Destination, domain string) bool {
+	return dest.Address.Family().IsDomain() && dest.Address.Domain() == domain
+}
+
 func isInternalDomain(dest net.Destination) bool {
-	return dest.Address.Family().IsDomain() && dest.Address.Domain() == internalDomain
+	return isDomain(dest, internalDomain)
 }
 
 func init() {
@@ -56,3 +60,39 @@ func (r *Reverse) Init(config *Config, d routing.Dispatcher, ohm outbound.Manage
 
 	return nil
 }
+
+func (r *Reverse) Type() interface{} {
+	return (*Reverse)(nil)
+}
+
+func (r *Reverse) Start() error {
+	for _, b := range r.bridges {
+		if err := b.Start(); err != nil {
+			return err
+		}
+	}
+
+	for _, p := range r.portals {
+		if err := p.Start(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *Reverse) Close() error {
+	for _, b := range r.bridges {
+		if err := b.Close(); err != nil {
+			return err
+		}
+	}
+
+	for _, p := range r.portals {
+		if err := p.Close(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 4 - 0
common/mux/client.go

@@ -190,6 +190,10 @@ func NewClientWorker(stream vio.Link, s ClientStrategy) (*ClientWorker, error) {
 	return c, nil
 }
 
+func (m *ClientWorker) ActiveConnections() uint32 {
+	return uint32(m.sessionManager.Size())
+}
+
 // Closed returns true if this Client is closed.
 func (m *ClientWorker) Closed() bool {
 	return m.done.Done()

+ 211 - 0
testing/scenarios/reverse_test.go

@@ -0,0 +1,211 @@
+package scenarios
+
+import (
+	"crypto/rand"
+	"testing"
+	"time"
+
+	"v2ray.com/core/app/reverse"
+	"v2ray.com/core/app/router"
+
+	"v2ray.com/core"
+	"v2ray.com/core/app/proxyman"
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/compare"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/uuid"
+	"v2ray.com/core/proxy/blackhole"
+	"v2ray.com/core/proxy/dokodemo"
+	"v2ray.com/core/proxy/freedom"
+	"v2ray.com/core/proxy/vmess"
+	"v2ray.com/core/proxy/vmess/inbound"
+	"v2ray.com/core/proxy/vmess/outbound"
+	"v2ray.com/core/testing/servers/tcp"
+)
+
+func TestReverseProxy(t *testing.T) {
+	tcpServer := tcp.Server{
+		MsgProcessor: xor,
+	}
+	dest, err := tcpServer.Start()
+	common.Must(err)
+
+	defer tcpServer.Close()
+
+	userID := protocol.NewID(uuid.New())
+	externalPort := tcp.PickPort()
+	reversePort := tcp.PickPort()
+
+	serverConfig := &core.Config{
+		App: []*serial.TypedMessage{
+			serial.ToTypedMessage(&reverse.Config{
+				PortalConfig: []*reverse.PortalConfig{
+					{
+						Tag:    "portal",
+						Domain: "test.v2ray.com",
+					},
+				},
+			}),
+			serial.ToTypedMessage(&router.Config{
+				Rule: []*router.RoutingRule{
+					{
+						Domain: []*router.Domain{
+							{Type: router.Domain_Full, Value: "test.v2ray.com"},
+						},
+						Tag: "portal",
+					},
+					{
+						InboundTag: []string{"external"},
+						Tag:        "portal",
+					},
+				},
+			}),
+		},
+		Inbound: []*core.InboundHandlerConfig{
+			{
+				Tag: "external",
+				ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
+					PortRange: net.SinglePortRange(externalPort),
+					Listen:    net.NewIPOrDomain(net.LocalHostIP),
+				}),
+				ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
+					Address: net.NewIPOrDomain(dest.Address),
+					Port:    uint32(dest.Port),
+					NetworkList: &net.NetworkList{
+						Network: []net.Network{net.Network_TCP},
+					},
+				}),
+			},
+			{
+				ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
+					PortRange: net.SinglePortRange(reversePort),
+					Listen:    net.NewIPOrDomain(net.LocalHostIP),
+				}),
+				ProxySettings: serial.ToTypedMessage(&inbound.Config{
+					User: []*protocol.User{
+						{
+							Account: serial.ToTypedMessage(&vmess.Account{
+								Id:      userID.String(),
+								AlterId: 64,
+							}),
+						},
+					},
+				}),
+			},
+		},
+		Outbound: []*core.OutboundHandlerConfig{
+			{
+				ProxySettings: serial.ToTypedMessage(&blackhole.Config{}),
+			},
+		},
+	}
+
+	clientPort := tcp.PickPort()
+	clientConfig := &core.Config{
+		App: []*serial.TypedMessage{
+			serial.ToTypedMessage(&reverse.Config{
+				BridgeConfig: []*reverse.BridgeConfig{
+					{
+						Tag:    "bridge",
+						Domain: "test.v2ray.com",
+					},
+				},
+			}),
+			serial.ToTypedMessage(&router.Config{
+				Rule: []*router.RoutingRule{
+					{
+						Domain: []*router.Domain{
+							{Type: router.Domain_Full, Value: "test.v2ray.com"},
+						},
+						Tag: "reverse",
+					},
+					{
+						InboundTag: []string{"bridge"},
+						Tag:        "freedom",
+					},
+				},
+			}),
+		},
+		Inbound: []*core.InboundHandlerConfig{
+			{
+				ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
+					PortRange: net.SinglePortRange(clientPort),
+					Listen:    net.NewIPOrDomain(net.LocalHostIP),
+				}),
+				ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
+					Address: net.NewIPOrDomain(dest.Address),
+					Port:    uint32(dest.Port),
+					NetworkList: &net.NetworkList{
+						Network: []net.Network{net.Network_TCP},
+					},
+				}),
+			},
+		},
+		Outbound: []*core.OutboundHandlerConfig{
+			{
+				Tag:           "freedom",
+				ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
+			},
+			{
+				Tag: "reverse",
+				ProxySettings: serial.ToTypedMessage(&outbound.Config{
+					Receiver: []*protocol.ServerEndpoint{
+						{
+							Address: net.NewIPOrDomain(net.LocalHostIP),
+							Port:    uint32(reversePort),
+							User: []*protocol.User{
+								{
+									Account: serial.ToTypedMessage(&vmess.Account{
+										Id:      userID.String(),
+										AlterId: 64,
+										SecuritySettings: &protocol.SecurityConfig{
+											Type: protocol.SecurityType_AES128_GCM,
+										},
+									}),
+								},
+							},
+						},
+					},
+				}),
+			},
+		},
+	}
+
+	servers, err := InitializeServerConfigs(serverConfig, clientConfig)
+	common.Must(err)
+
+	defer CloseAllServers(servers)
+
+	//var wg sync.WaitGroup
+	//wg.Add(10)
+	//for i := 0; i < 10; i++ {
+	//go func() {
+	//defer wg.Done()
+
+	conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+		IP:   []byte{127, 0, 0, 1},
+		Port: int(externalPort),
+	})
+	common.Must(err)
+	defer conn.Close()
+
+	payload := make([]byte, 10240*1024)
+	rand.Read(payload)
+
+	nBytes, err := conn.Write([]byte(payload))
+	common.Must(err)
+
+	if nBytes != len(payload) {
+		t.Error("only part of payload is written: ", nBytes)
+	}
+
+	response := readFrom(conn, time.Second*20, 10240*1024)
+	if err := compare.BytesEqualWithDetail(response, xor([]byte(payload))); err != nil {
+		t.Error(err)
+	}
+	//}()
+	//}
+	//wg.Wait()
+}