Procházet zdrojové kódy

configuration for connection reuse

v2ray před 9 roky
rodič
revize
a86cd36ad2

+ 2 - 0
shell/point/config.go

@@ -5,6 +5,7 @@ import (
 	"github.com/v2ray/v2ray-core/app/router"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport"
 )
 
 type ConnectionConfig struct {
@@ -55,6 +56,7 @@ type Config struct {
 	OutboundConfig  *ConnectionConfig
 	InboundDetours  []*InboundDetourConfig
 	OutboundDetours []*OutboundDetourConfig
+	TransportConfig *transport.Config
 }
 
 type ConfigLoader func(init string) (*Config, error)

+ 3 - 0
shell/point/config_json.go

@@ -13,6 +13,7 @@ import (
 	"github.com/v2ray/v2ray-core/app/router"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport"
 )
 
 const (
@@ -30,6 +31,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
 		OutboundConfig  *ConnectionConfig       `json:"outbound"`
 		InboundDetours  []*InboundDetourConfig  `json:"inboundDetour"`
 		OutboundDetours []*OutboundDetourConfig `json:"outboundDetour"`
+		Transport       *transport.Config       `json:"transport"`
 	}
 	jsonConfig := new(JsonConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -57,6 +59,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
 		}
 	}
 	this.DNSConfig = jsonConfig.DNSConfig
+	this.TransportConfig = jsonConfig.Transport
 	return nil
 }
 

+ 5 - 0
shell/point/point.go

@@ -16,6 +16,7 @@ import (
 	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
 	proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
+	"github.com/v2ray/v2ray-core/transport"
 )
 
 // Point shell of V2Ray.
@@ -38,6 +39,10 @@ func NewPoint(pConfig *Config) (*Point, error) {
 	vpoint.port = pConfig.Port
 	vpoint.listen = pConfig.ListenOn
 
+	if pConfig.TransportConfig != nil {
+		transport.ApplyConfig(pConfig.TransportConfig)
+	}
+
 	if pConfig.LogConfig != nil {
 		logConfig := pConfig.LogConfig
 		if len(logConfig.AccessLog) > 0 {

+ 16 - 0
transport/config.go

@@ -0,0 +1,16 @@
+package transport
+
+type StreamType int
+
+const (
+	StreamTypeTCP = StreamType(0)
+)
+
+type TCPConfig struct {
+	ConnectionReuse bool
+}
+
+type Config struct {
+	StreamType StreamType
+	TCPConfig  *TCPConfig
+}

+ 36 - 0
transport/config_json.go

@@ -0,0 +1,36 @@
+// +build json
+
+package transport
+
+import (
+	"encoding/json"
+	"strings"
+)
+
+func (this *Config) UnmarshalJSON(data []byte) error {
+	type TypeConfig struct {
+		StreamType string          `json:"streamType"`
+		Settings   json.RawMessage `json:"settings"`
+	}
+	type JsonTCPConfig struct {
+		ConnectionReuse bool `json:"connectionReuse"`
+	}
+
+	typeConfig := new(TypeConfig)
+	if err := json.Unmarshal(data, typeConfig); err != nil {
+		return err
+	}
+
+	streamType := strings.ToLower(typeConfig.StreamType)
+	if streamType == "tcp" {
+		jsonTCPConfig := new(JsonTCPConfig)
+		if err := json.Unmarshal(data, jsonTCPConfig); err != nil {
+			return err
+		}
+		this.TCPConfig = &TCPConfig{
+			ConnectionReuse: jsonTCPConfig.ConnectionReuse,
+		}
+	}
+
+	return nil
+}

+ 3 - 1
transport/hub/connection.go

@@ -3,6 +3,8 @@ package hub
 import (
 	"net"
 	"time"
+
+	"github.com/v2ray/v2ray-core/transport"
 )
 
 type ConnectionHandler func(*Connection)
@@ -37,7 +39,7 @@ func (this *Connection) Close() error {
 	if this == nil || this.conn == nil {
 		return ErrorClosedConnection
 	}
-	if this.Reusable() {
+	if transport.TCPStreamConfig.ConnectionReuse && this.Reusable() {
 		this.listener.Recycle(this.dest, this.conn)
 		return nil
 	}

+ 32 - 5
transport/hub/connection_cache.go

@@ -3,9 +3,33 @@ package hub
 import (
 	"net"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
+type Once struct {
+	m    sync.Mutex
+	done uint32
+}
+
+func (o *Once) Do(f func()) {
+	if atomic.LoadUint32(&o.done) == 1 {
+		return
+	}
+	o.m.Lock()
+	defer o.m.Unlock()
+	if o.done == 0 {
+		defer atomic.StoreUint32(&o.done, 1)
+		f()
+	}
+}
+
+func (o *Once) Reset() {
+	o.m.Lock()
+	defer o.m.Unlock()
+	atomic.StoreUint32(&o.done, 0)
+}
+
 type AwaitingConnection struct {
 	conn   net.Conn
 	expire time.Time
@@ -17,19 +41,20 @@ func (this *AwaitingConnection) Expired() bool {
 
 type ConnectionCache struct {
 	sync.Mutex
-	cache map[string][]*AwaitingConnection
+	cache       map[string][]*AwaitingConnection
+	cleanupOnce Once
 }
 
 func NewConnectionCache() *ConnectionCache {
-	c := &ConnectionCache{
+	return &ConnectionCache{
 		cache: make(map[string][]*AwaitingConnection),
 	}
-	go c.Cleanup()
-	return c
 }
 
 func (this *ConnectionCache) Cleanup() {
-	for {
+	defer this.cleanupOnce.Reset()
+
+	for len(this.cache) > 0 {
 		time.Sleep(time.Second * 4)
 		this.Lock()
 		for key, value := range this.cache {
@@ -74,6 +99,8 @@ func (this *ConnectionCache) Recycle(dest string, conn net.Conn) {
 		list = []*AwaitingConnection{aconn}
 	}
 	this.cache[dest] = list
+
+	go this.cleanupOnce.Do(this.Cleanup)
 }
 
 func FindFirstValid(list []*AwaitingConnection) int {

+ 5 - 1
transport/hub/dialer.go

@@ -7,6 +7,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport"
 )
 
 var (
@@ -17,7 +18,10 @@ var (
 
 func Dial(dest v2net.Destination) (*Connection, error) {
 	destStr := dest.String()
-	conn := globalCache.Get(destStr)
+	var conn net.Conn
+	if transport.TCPStreamConfig.ConnectionReuse {
+		conn = globalCache.Get(destStr)
+	}
 	if conn == nil {
 		var err error
 		log.Debug("Hub: Dialling new connection to ", dest)

+ 22 - 0
transport/transport.go

@@ -0,0 +1,22 @@
+package transport
+
+import "github.com/v2ray/v2ray-core/common/log"
+
+var (
+	TCPStreamConfig = &TCPConfig{
+		ConnectionReuse: false,
+	}
+)
+
+func ApplyConfig(config *Config) error {
+	if config.StreamType == StreamTypeTCP {
+		if config.TCPConfig != nil {
+			TCPStreamConfig = config.TCPConfig
+			if config.TCPConfig.ConnectionReuse {
+				log.Info("Transport: TCP connection reuse enabled.")
+			}
+		}
+	}
+
+	return nil
+}