Pārlūkot izejas kodu

simpify websocket connection

Darien Raymond 8 gadi atpakaļ
vecāks
revīzija
4afad8d31c

+ 0 - 99
transport/internet/websocket/connection.go

@@ -1,99 +0,0 @@
-package websocket
-
-import (
-	"io"
-	"net"
-	"time"
-
-	"v2ray.com/core/common/errors"
-)
-
-var (
-	ErrInvalidConn = errors.New("Invalid Connection.")
-)
-
-type ConnectionManager interface {
-	Recycle(string, *wsconn)
-}
-
-type Connection struct {
-	dest     string
-	conn     *wsconn
-	listener ConnectionManager
-	reusable bool
-	config   *Config
-}
-
-func NewConnection(dest string, conn *wsconn, manager ConnectionManager, config *Config) *Connection {
-	return &Connection{
-		dest:     dest,
-		conn:     conn,
-		listener: manager,
-		reusable: config.IsConnectionReuse(),
-		config:   config,
-	}
-}
-
-func (v *Connection) Read(b []byte) (int, error) {
-	if v == nil || v.conn == nil {
-		return 0, io.EOF
-	}
-
-	return v.conn.Read(b)
-}
-
-func (v *Connection) Write(b []byte) (int, error) {
-	if v == nil || v.conn == nil {
-		return 0, io.ErrClosedPipe
-	}
-	return v.conn.Write(b)
-}
-
-func (v *Connection) Close() error {
-	if v == nil || v.conn == nil {
-		return io.ErrClosedPipe
-	}
-	if v.Reusable() {
-		v.listener.Recycle(v.dest, v.conn)
-		return nil
-	}
-	err := v.conn.Close()
-	v.conn = nil
-	return err
-}
-
-func (v *Connection) LocalAddr() net.Addr {
-	return v.conn.LocalAddr()
-}
-
-func (v *Connection) RemoteAddr() net.Addr {
-	return v.conn.RemoteAddr()
-}
-
-func (v *Connection) SetDeadline(t time.Time) error {
-	return v.conn.SetDeadline(t)
-}
-
-func (v *Connection) SetReadDeadline(t time.Time) error {
-	return v.conn.SetReadDeadline(t)
-}
-
-func (v *Connection) SetWriteDeadline(t time.Time) error {
-	return v.conn.SetWriteDeadline(t)
-}
-
-func (v *Connection) SetReusable(reusable bool) {
-	v.reusable = reusable
-}
-
-func (v *Connection) Reusable() bool {
-	return v.config.IsConnectionReuse() && v.reusable
-}
-
-func (v *Connection) SysFd() (int, error) {
-	return getSysFd(v.conn)
-}
-
-func getSysFd(conn net.Conn) (int, error) {
-	return 0, ErrInvalidConn
-}

+ 0 - 114
transport/internet/websocket/connection_cache.go

@@ -1,114 +0,0 @@
-package websocket
-
-import (
-	"net"
-	"sync"
-	"time"
-
-	"v2ray.com/core/common/log"
-	"v2ray.com/core/common/signal"
-)
-
-type AwaitingConnection struct {
-	conn   *wsconn
-	expire time.Time
-}
-
-func (v *AwaitingConnection) Expired() bool {
-	return v.expire.Before(time.Now())
-}
-
-type ConnectionCache struct {
-	sync.Mutex
-	cache       map[string][]*AwaitingConnection
-	cleanupOnce signal.Once
-}
-
-func NewConnectionCache() *ConnectionCache {
-	return &ConnectionCache{
-		cache: make(map[string][]*AwaitingConnection),
-	}
-}
-
-func (v *ConnectionCache) Cleanup() {
-	defer v.cleanupOnce.Reset()
-
-	for len(v.cache) > 0 {
-		time.Sleep(time.Second * 7)
-		v.Lock()
-		for key, value := range v.cache {
-			size := len(value)
-			changed := false
-			for i := 0; i < size; {
-				if value[i].Expired() {
-					value[i].conn.Close()
-					value[i] = value[size-1]
-					size--
-					changed = true
-				} else {
-					i++
-				}
-			}
-			if changed {
-				for i := size; i < len(value); i++ {
-					value[i] = nil
-				}
-				value = value[:size]
-				v.cache[key] = value
-			}
-		}
-		v.Unlock()
-	}
-}
-
-func (v *ConnectionCache) Recycle(dest string, conn *wsconn) {
-	v.Lock()
-	defer v.Unlock()
-
-	aconn := &AwaitingConnection{
-		conn:   conn,
-		expire: time.Now().Add(time.Second * 7),
-	}
-
-	var list []*AwaitingConnection
-	if val, found := v.cache[dest]; found {
-		val = append(val, aconn)
-		list = val
-	} else {
-		list = []*AwaitingConnection{aconn}
-	}
-	v.cache[dest] = list
-
-	go v.cleanupOnce.Do(v.Cleanup)
-}
-
-func FindFirstValid(list []*AwaitingConnection) int {
-	for idx, conn := range list {
-		if !conn.Expired() && !conn.conn.connClosing {
-			return idx
-		}
-		go conn.conn.Close()
-	}
-	return -1
-}
-
-func (v *ConnectionCache) Get(dest string) net.Conn {
-	v.Lock()
-	defer v.Unlock()
-
-	list, found := v.cache[dest]
-	if !found {
-		return nil
-	}
-
-	firstValid := FindFirstValid(list)
-	if firstValid == -1 {
-		delete(v.cache, dest)
-		return nil
-	}
-	res := list[firstValid].conn
-	list = list[firstValid+1:]
-	v.cache[dest] = list
-	log.Debug("WS:Conn Cache used.")
-	return res
-}

+ 4 - 3
transport/internet/websocket/dialer.go

@@ -9,11 +9,12 @@ import (
 	"v2ray.com/core/common/log"
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/transport/internet"
+	"v2ray.com/core/transport/internet/internal"
 	v2tls "v2ray.com/core/transport/internet/tls"
 )
 
 var (
-	globalCache = NewConnectionCache()
+	globalCache = internal.NewConnectionPool()
 )
 
 func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
@@ -27,7 +28,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
 	}
 	wsSettings := networkSettings.(*Config)
 
-	id := src.String() + "-" + dest.NetAddr()
+	id := internal.NewConnectionID(src, dest)
 	var conn *wsconn
 	if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() {
 		connt := globalCache.Get(id)
@@ -43,7 +44,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
 			return nil, err
 		}
 	}
-	return NewConnection(id, conn, globalCache, wsSettings), nil
+	return internal.NewConnection(id, conn, globalCache, internal.ReuseConnection(wsSettings.IsConnectionReuse())), nil
 }
 
 func init() {

+ 3 - 2
transport/internet/websocket/hub.go

@@ -16,6 +16,7 @@ import (
 	v2tls "v2ray.com/core/transport/internet/tls"
 
 	"github.com/gorilla/websocket"
+	"v2ray.com/core/transport/internet/internal"
 )
 
 var (
@@ -157,14 +158,14 @@ func (v *WSListener) Accept() (internet.Connection, error) {
 			if connErr.err != nil {
 				return nil, connErr.err
 			}
-			return NewConnection("", connErr.conn.(*wsconn), v, v.config), nil
+			return internal.NewConnection(internal.ConnectionID{}, connErr.conn.(*wsconn), v, internal.ReuseConnection(v.config.IsConnectionReuse())), nil
 		case <-time.After(time.Second * 2):
 		}
 	}
 	return nil, ErrClosedListener
 }
 
-func (v *WSListener) Recycle(dest string, conn *wsconn) {
+func (v *WSListener) Put(id internal.ConnectionID, conn net.Conn) {
 	v.Lock()
 	defer v.Unlock()
 	if !v.acccepting {

+ 1 - 1
transport/internet/websocket/wsconn.go

@@ -159,7 +159,7 @@ func (ws *wsconn) SetReusable(reusable bool) {
 }
 
 func (ws *wsconn) pingPong() {
-	pongRcv := make(chan int, 0)
+	pongRcv := make(chan int, 1)
 	ws.wsc.SetPongHandler(func(data string) error {
 		pongRcv <- 0
 		return nil