ソースを参照

reusable connection

v2ray 9 年 前
コミット
0fac2084c7

+ 3 - 3
common/protocol/raw/server.go

@@ -48,8 +48,8 @@ func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.Requ
 
 	_, err := io.ReadFull(reader, buffer.Value[:protocol.IDBytesLen])
 	if err != nil {
-		log.Error("Raw: Failed to read request header: ", err)
-		return nil, err
+		log.Info("Raw: Failed to read request header: ", err)
+		return nil, io.EOF
 	}
 
 	user, timestamp, valid := this.userValidator.Get(buffer.Value[:protocol.IDBytesLen])
@@ -77,7 +77,7 @@ func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.Requ
 	}
 
 	if request.Version != Version {
-		log.Warning("Raw: Invalid protocol version ", request.Version)
+		log.Info("Raw: Invalid protocol version ", request.Version)
 		return nil, protocol.ErrorInvalidVersion
 	}
 

+ 2 - 2
proxy/freedom/freedom.go

@@ -15,7 +15,7 @@ import (
 	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
-	"github.com/v2ray/v2ray-core/transport/dialer"
+	"github.com/v2ray/v2ray-core/transport/hub"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -77,7 +77,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 		destination = this.ResolveIP(destination)
 	}
 	err := retry.Timed(5, 100).On(func() error {
-		rawConn, err := dialer.Dial(destination)
+		rawConn, err := hub.DialWithoutCache(destination)
 		if err != nil {
 			return err
 		}

+ 10 - 3
proxy/vmess/inbound/inbound.go

@@ -1,6 +1,7 @@
 package inbound
 
 import (
+	"io"
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
@@ -124,7 +125,7 @@ func (this *VMessInboundHandler) Listen(address v2net.Address, port v2net.Port)
 func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 	defer connection.Close()
 
-	connReader := v2net.NewTimeOutReader(16, connection)
+	connReader := v2net.NewTimeOutReader(8, connection)
 	defer connReader.Release()
 
 	reader := v2io.NewBufferedReader(connReader)
@@ -135,13 +136,19 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 
 	request, err := session.DecodeRequestHeader(reader)
 	if err != nil {
-		log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
-		log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
+		if err != io.EOF {
+			log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
+			log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
+		}
 		return
 	}
 	log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
 	log.Debug("VMessIn: Received request for ", request.Destination())
 
+	if request.Option.IsChunkStream() {
+		connection.SetReusable(true)
+	}
+
 	ray := this.packetDispatcher.DispatchToOutbound(request.Destination())
 	input := ray.InboundInput()
 	output := ray.InboundOutput()

+ 5 - 2
proxy/vmess/outbound/outbound.go

@@ -14,7 +14,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
-	"github.com/v2ray/v2ray-core/transport/dialer"
+	"github.com/v2ray/v2ray-core/transport/hub"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -41,7 +41,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 		Option:  protocol.RequestOptionChunkStream,
 	}
 
-	conn, err := dialer.Dial(destination)
+	conn, err := hub.Dial(destination)
 	if err != nil {
 		log.Error("Failed to open ", destination, ": ", err)
 		return err
@@ -49,6 +49,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
 
 	defer conn.Close()
+	if request.Option.IsChunkStream() {
+		conn.SetReusable(true)
+	}
 
 	input := ray.OutboundInput()
 	output := ray.OutboundOutput()

+ 3 - 0
testing/scenarios/data/test_4_client.json

@@ -1,6 +1,9 @@
 {
   "port": 50030,
   "listen": "127.0.0.1",
+  "log": {
+    "loglevel": "debug"
+  },
   "inbound": {
     "protocol": "dokodemo-door",
     "settings": {

+ 1 - 1
testing/scenarios/data/test_4_server.json

@@ -2,7 +2,7 @@
   "port": 50031,
   "listen": "127.0.0.1",
   "log": {
-    "loglevel": "warning"
+    "loglevel": "debug"
   },
   "inbound": {
     "protocol": "vmess",

+ 0 - 40
transport/dialer/dialer.go

@@ -1,40 +0,0 @@
-package dialer
-
-import (
-	"errors"
-	"net"
-	"time"
-
-	v2net "github.com/v2ray/v2ray-core/common/net"
-)
-
-var (
-	ErrorInvalidHost = errors.New("Invalid Host.")
-)
-
-func Dial(dest v2net.Destination) (net.Conn, error) {
-	if dest.Address().IsDomain() {
-		dialer := &net.Dialer{
-			Timeout:   time.Second * 60,
-			DualStack: true,
-		}
-		network := "tcp"
-		if dest.IsUDP() {
-			network = "udp"
-		}
-		return dialer.Dial(network, dest.NetAddr())
-	}
-
-	ip := dest.Address().IP()
-	if dest.IsTCP() {
-		return net.DialTCP("tcp", nil, &net.TCPAddr{
-			IP:   ip,
-			Port: int(dest.Port()),
-		})
-	} else {
-		return net.DialUDP("udp", nil, &net.UDPAddr{
-			IP:   ip,
-			Port: int(dest.Port()),
-		})
-	}
-}

+ 7 - 12
transport/hub/connection.go

@@ -7,9 +7,14 @@ import (
 
 type ConnectionHandler func(*Connection)
 
+type ConnectionManager interface {
+	Recycle(string, net.Conn)
+}
+
 type Connection struct {
+	dest     string
 	conn     net.Conn
-	listener *TCPHub
+	listener ConnectionManager
 	reusable bool
 }
 
@@ -33,22 +38,12 @@ func (this *Connection) Close() error {
 		return ErrorClosedConnection
 	}
 	if this.Reusable() {
-		this.listener.Recycle(this.conn)
+		this.listener.Recycle(this.dest, this.conn)
 		return nil
 	}
 	return this.conn.Close()
 }
 
-func (this *Connection) Release() {
-	if this == nil || this.listener == nil {
-		return
-	}
-
-	this.Close()
-	this.conn = nil
-	this.listener = nil
-}
-
 func (this *Connection) LocalAddr() net.Addr {
 	return this.conn.LocalAddr()
 }

+ 107 - 0
transport/hub/connection_cache.go

@@ -0,0 +1,107 @@
+package hub
+
+import (
+	"net"
+	"sync"
+	"time"
+)
+
+type AwaitingConnection struct {
+	conn   net.Conn
+	expire time.Time
+}
+
+func (this *AwaitingConnection) Expired() bool {
+	return this.expire.Before(time.Now())
+}
+
+type ConnectionCache struct {
+	sync.Mutex
+	cache map[string][]*AwaitingConnection
+}
+
+func NewConnectionCache() *ConnectionCache {
+	c := &ConnectionCache{
+		cache: make(map[string][]*AwaitingConnection),
+	}
+	go c.Cleanup()
+	return c
+}
+
+func (this *ConnectionCache) Cleanup() {
+	for {
+		time.Sleep(time.Second * 4)
+		this.Lock()
+		for key, value := range this.cache {
+			size := len(value)
+			changed := false
+			for i := 0; i < size; {
+				if value[i].Expired() {
+					value[i].conn.Close()
+					value[i] = value[size-1]
+					size--
+					changed = true
+				} else {
+					i++
+				}
+			}
+			if changed {
+				for i := size; i < len(value); i++ {
+					value[i] = nil
+				}
+				value = value[:size]
+				this.cache[key] = value
+			}
+		}
+		this.Unlock()
+	}
+}
+
+func (this *ConnectionCache) Recycle(dest string, conn net.Conn) {
+	this.Lock()
+	defer this.Unlock()
+
+	aconn := &AwaitingConnection{
+		conn:   conn,
+		expire: time.Now().Add(time.Second * 4),
+	}
+
+	var list []*AwaitingConnection
+	if v, found := this.cache[dest]; found {
+		v = append(v, aconn)
+		list = v
+	} else {
+		list = []*AwaitingConnection{aconn}
+	}
+	this.cache[dest] = list
+}
+
+func FindFirstValid(list []*AwaitingConnection) int {
+	for idx, conn := range list {
+		if !conn.Expired() {
+			return idx
+		}
+		conn.conn.Close()
+	}
+	return -1
+}
+
+func (this *ConnectionCache) Get(dest string) net.Conn {
+	this.Lock()
+	defer this.Unlock()
+
+	list, found := this.cache[dest]
+	if !found {
+		return nil
+	}
+
+	firstValid := FindFirstValid(list)
+	if firstValid == -1 {
+		delete(this.cache, dest)
+		return nil
+	}
+	res := list[firstValid].conn
+	list = list[firstValid+1:]
+	this.cache[dest] = list
+	return res
+}

+ 63 - 0
transport/hub/dialer.go

@@ -0,0 +1,63 @@
+package hub
+
+import (
+	"errors"
+	"net"
+	"time"
+
+	"github.com/v2ray/v2ray-core/common/log"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+var (
+	ErrorInvalidHost = errors.New("Invalid Host.")
+
+	globalCache = NewConnectionCache()
+)
+
+func Dial(dest v2net.Destination) (*Connection, error) {
+	destStr := dest.String()
+	conn := globalCache.Get(destStr)
+	if conn == nil {
+		var err error
+		log.Debug("Hub: Dialling new connection to ", dest)
+		conn, err = DialWithoutCache(dest)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		log.Debug("Hub: Reusing connection to ", dest)
+	}
+	return &Connection{
+		dest:     destStr,
+		conn:     conn,
+		listener: globalCache,
+	}, nil
+}
+
+func DialWithoutCache(dest v2net.Destination) (net.Conn, error) {
+	if dest.Address().IsDomain() {
+		dialer := &net.Dialer{
+			Timeout:   time.Second * 60,
+			DualStack: true,
+		}
+		network := "tcp"
+		if dest.IsUDP() {
+			network = "udp"
+		}
+		return dialer.Dial(network, dest.NetAddr())
+	}
+
+	ip := dest.Address().IP()
+	if dest.IsTCP() {
+		return net.DialTCP("tcp", nil, &net.TCPAddr{
+			IP:   ip,
+			Port: int(dest.Port()),
+		})
+	}
+
+	return net.DialUDP("udp", nil, &net.UDPAddr{
+		IP:   ip,
+		Port: int(dest.Port()),
+	})
+}

+ 2 - 2
transport/dialer/dialer_test.go → transport/hub/dialer_test.go

@@ -1,4 +1,4 @@
-package dialer_test
+package hub_test
 
 import (
 	"testing"
@@ -7,7 +7,7 @@ import (
 	v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
 	"github.com/v2ray/v2ray-core/testing/assert"
 	"github.com/v2ray/v2ray-core/testing/servers/tcp"
-	. "github.com/v2ray/v2ray-core/transport/dialer"
+	. "github.com/v2ray/v2ray-core/transport/hub"
 )
 
 func TestDialDomain(t *testing.T) {

+ 3 - 11
transport/hub/tcp.go

@@ -49,24 +49,14 @@ func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandle
 }
 
 func (this *TCPHub) Close() {
-	this.Lock()
-	defer this.Unlock()
-
 	this.accepting = false
 	this.listener.Close()
-	this.listener = nil
 }
 
 func (this *TCPHub) start() {
 	this.accepting = true
 	for this.accepting {
-		this.Lock()
-		if !this.accepting {
-			this.Unlock()
-			break
-		}
 		conn, err := this.listener.Accept()
-		this.Unlock()
 
 		if err != nil {
 			if this.accepting {
@@ -75,6 +65,7 @@ func (this *TCPHub) start() {
 			continue
 		}
 		go this.connCallback(&Connection{
+			dest:     conn.RemoteAddr().String(),
 			conn:     conn,
 			listener: this,
 		})
@@ -82,9 +73,10 @@ func (this *TCPHub) start() {
 }
 
 // @Private
-func (this *TCPHub) Recycle(conn net.Conn) {
+func (this *TCPHub) Recycle(dest string, conn net.Conn) {
 	if this.accepting {
 		go this.connCallback(&Connection{
+			dest:     dest,
 			conn:     conn,
 			listener: this,
 		})