Переглянути джерело

dynamic allocation of inbounds

v2ray 9 роки тому
батько
коміт
98d99a1fd0

+ 2 - 7
common/net/testing/port.go

@@ -1,15 +1,10 @@
 package testing
 
 import (
-	"sync/atomic"
-
+	"github.com/v2ray/v2ray-core/common/dice"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 )
 
-var (
-	port = int32(30000)
-)
-
 func PickPort() v2net.Port {
-	return v2net.Port(atomic.AddInt32(&port, 1))
+	return v2net.Port(30000 + dice.Roll(10000))
 }

+ 134 - 1
shell/point/inbound_detour_dynamic.go

@@ -1,3 +1,136 @@
 package point
 
-import ()
+import (
+	"sync"
+	"time"
+
+	"github.com/v2ray/v2ray-core/app"
+	"github.com/v2ray/v2ray-core/common/dice"
+	"github.com/v2ray/v2ray-core/common/log"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/common/retry"
+	"github.com/v2ray/v2ray-core/proxy"
+	proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
+)
+
+type InboundDetourHandlerDynamic struct {
+	sync.RWMutex
+	space       app.Space
+	config      *InboundDetourConfig
+	portsInUse  map[v2net.Port]bool
+	ichInUse    []*InboundConnectionHandlerWithPort
+	ich2Recycle []*InboundConnectionHandlerWithPort
+	lastRefresh time.Time
+	started     bool
+}
+
+func NewInboundDetourHandlerDynamic(space app.Space, config *InboundDetourConfig) (*InboundDetourHandlerDynamic, error) {
+	handler := &InboundDetourHandlerDynamic{
+		space:      space,
+		config:     config,
+		portsInUse: make(map[v2net.Port]bool),
+	}
+	if err := handler.refresh(); err != nil {
+		return nil, err
+	}
+	return handler, nil
+}
+
+func (this *InboundDetourHandlerDynamic) refresh() error {
+	this.Lock()
+	defer this.Unlock()
+
+	this.ich2Recycle = this.ichInUse
+	if this.ich2Recycle != nil {
+		time.AfterFunc(10*time.Second, func() {
+			for _, ich := range this.ich2Recycle {
+				if ich != nil {
+					ich.handler.Close()
+					delete(this.portsInUse, ich.port)
+				}
+			}
+		})
+	}
+
+	ichCount := this.config.Allocation.Concurrency
+	// TODO: check ichCount
+	if this.ichInUse == nil {
+		this.ichInUse = make([]*InboundConnectionHandlerWithPort, ichCount)
+	}
+	for idx, _ := range this.ichInUse {
+		port := this.pickUnusedPort()
+		ich, err := proxyrepo.CreateInboundConnectionHandler(this.config.Protocol, this.space, this.config.Settings)
+		if err != nil {
+			log.Error("Point: Failed to create inbound connection handler: ", err)
+			return err
+		}
+		this.ichInUse[idx] = &InboundConnectionHandlerWithPort{
+			port:    port,
+			handler: ich,
+		}
+	}
+	if this.started {
+		this.Start()
+	}
+
+	this.lastRefresh = time.Now()
+	time.AfterFunc(time.Duration(this.config.Allocation.Refresh)*time.Minute, func() {
+		this.refresh()
+	})
+
+	return nil
+}
+
+func (this *InboundDetourHandlerDynamic) pickUnusedPort() v2net.Port {
+	delta := int(this.config.PortRange.To) - int(this.config.PortRange.From) + 1
+	for {
+		r := dice.Roll(delta)
+		port := this.config.PortRange.From + v2net.Port(r)
+		_, used := this.portsInUse[port]
+		if !used {
+			this.portsInUse[port] = true
+			return port
+		}
+	}
+}
+
+func (this *InboundDetourHandlerDynamic) GetConnectionHandler() (proxy.InboundConnectionHandler, int) {
+	this.RLock()
+	defer this.RUnlock()
+	ich := this.ichInUse[dice.Roll(len(this.ichInUse))]
+	until := (time.Now().Unix() - this.lastRefresh.Unix()) / 60 / 1000
+	return ich.handler, int(until)
+}
+
+func (this *InboundDetourHandlerDynamic) Close() {
+	this.Lock()
+	defer this.Unlock()
+	for _, ich := range this.ichInUse {
+		ich.handler.Close()
+	}
+	if this.ich2Recycle != nil {
+		for _, ich := range this.ich2Recycle {
+			if ich != nil && ich.handler != nil {
+				ich.handler.Close()
+			}
+		}
+	}
+}
+
+func (this *InboundDetourHandlerDynamic) Start() error {
+	for _, ich := range this.ichInUse {
+		err := retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
+			err := ich.handler.Listen(ich.port)
+			if err != nil {
+				log.Error("Point: Failed to start inbound detour on port ", ich.port, ": ", err)
+				return err
+			}
+			return nil
+		})
+		if err != nil {
+			return err
+		}
+	}
+	this.started = true
+	return nil
+}

+ 7 - 0
shell/point/point.go

@@ -87,6 +87,13 @@ func NewPoint(pConfig *Config) (*Point, error) {
 					return nil, BadConfiguration
 				}
 				detourHandler = dh
+			case AllocationStrategyRandom:
+				dh, err := NewInboundDetourHandlerDynamic(vpoint.space.ForContext(detourConfig.Tag), detourConfig)
+				if err != nil {
+					log.Error("Point: Failed to create detour handler: ", err)
+					return nil, BadConfiguration
+				}
+				detourHandler = dh
 			default:
 				log.Error("Point: Unknown allocation strategy: ", allocConfig.Strategy)
 				return nil, BadConfiguration

+ 29 - 0
testing/scenarios/data/test_4_client.json

@@ -0,0 +1,29 @@
+{
+  "port": 50030,
+  "inbound": {
+    "protocol": "dokodemo-door",
+    "settings": {
+      "address": "127.0.0.1",
+      "port": 50032,
+      "network": "tcp",
+      "timeout": 0
+    }
+  },
+  "outbound": {
+    "protocol": "vmess",
+    "settings": {
+      "vnext": [
+        {
+          "address": "127.0.0.1",
+          "port": 50031,
+          "users": [
+            {
+              "id": "d17a1af7-efa5-42ca-b7e9-6a35282d737f",
+              "alterId": 10
+            }
+          ]
+        }
+      ]
+    }
+  }
+}

+ 48 - 0
testing/scenarios/data/test_4_server.json

@@ -0,0 +1,48 @@
+{
+  "port": 50031,
+  "log": {
+    "loglevel": "warning"
+  },
+  "inbound": {
+    "protocol": "vmess",
+    "settings": {
+      "clients": [
+        {
+          "id": "d17a1af7-efa5-42ca-b7e9-6a35282d737f",
+          "level": 1,
+          "alterId": 10
+        }
+      ],
+      "features": {
+        "detour": {
+          "to": "detour"
+        }
+      }
+    }
+  },
+  "outbound": {
+    "protocol": "freedom",
+    "settings": {}
+  },
+  "inboundDetour": [
+    {
+      "protocol": "vmess",
+      "port": "50035-50039",
+      "tag": "detour",
+      "settings": {
+        "clients": [
+          {
+            "id": "a12f49ba-466c-4dd5-8438-5c315143bc96",
+            "alterId": 100,
+            "level": 1
+          }
+        ]
+      },
+      "allocate": {
+        "strategy": "random",
+        "concurrency": 3,
+        "refresh": 5
+      }
+    }
+  ]
+}

+ 54 - 0
testing/scenarios/dynamic_vmess_test.go

@@ -0,0 +1,54 @@
+package scenarios
+
+import (
+	"net"
+	"testing"
+
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	v2testing "github.com/v2ray/v2ray-core/testing"
+	"github.com/v2ray/v2ray-core/testing/assert"
+	"github.com/v2ray/v2ray-core/testing/servers/tcp"
+)
+
+func TestDynamicVMess(t *testing.T) {
+	v2testing.Current(t)
+
+	tcpServer := &tcp.Server{
+		Port: v2net.Port(50032),
+		MsgProcessor: func(data []byte) []byte {
+			buffer := make([]byte, 0, 2048)
+			buffer = append(buffer, []byte("Processed: ")...)
+			buffer = append(buffer, data...)
+			return buffer
+		},
+	}
+	_, err := tcpServer.Start()
+	assert.Error(err).IsNil()
+	defer tcpServer.Close()
+
+	assert.Error(InitializeServerSetOnce("test_4")).IsNil()
+
+	for i := 0; i < 100; i++ {
+		conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+			IP:   []byte{127, 0, 0, 1},
+			Port: 50030,
+		})
+		assert.Error(err).IsNil()
+
+		payload := "dokodemo request."
+		nBytes, err := conn.Write([]byte(payload))
+		assert.Error(err).IsNil()
+		assert.Int(nBytes).Equals(len(payload))
+
+		conn.CloseWrite()
+
+		response := make([]byte, 1024)
+		nBytes, err = conn.Read(response)
+		assert.Error(err).IsNil()
+		assert.StringLiteral("Processed: " + payload).Equals(string(response[:nBytes]))
+
+		conn.Close()
+	}
+
+	CloseAllServers()
+}