Переглянути джерело

custom tcp listener and connection object

v2ray 9 роки тому
батько
коміт
8ae8b3c9f5

+ 6 - 30
proxy/dokodemo/dokodemo.go

@@ -9,8 +9,8 @@ import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/listener"
 )
 
 type DokodemoDoor struct {
@@ -21,7 +21,7 @@ type DokodemoDoor struct {
 	address       v2net.Address
 	port          v2net.Port
 	space         app.Space
-	tcpListener   *net.TCPListener
+	tcpListener   *listener.TCPListener
 	udpConn       *net.UDPConn
 	listeningPort v2net.Port
 }
@@ -42,8 +42,8 @@ func (this *DokodemoDoor) Port() v2net.Port {
 func (this *DokodemoDoor) Close() {
 	this.accepting = false
 	if this.tcpListener != nil {
-		this.tcpListener.Close()
 		this.tcpMutex.Lock()
+		this.tcpListener.Close()
 		this.tcpListener = nil
 		this.tcpMutex.Unlock()
 	}
@@ -132,42 +132,18 @@ func (this *DokodemoDoor) handleUDPPackets() {
 }
 
 func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
-	tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
-		IP:   []byte{0, 0, 0, 0},
-		Port: int(port),
-		Zone: "",
-	})
+	tcpListener, err := listener.ListenTCP(port, this.HandleTCPConnection)
 	if err != nil {
-		log.Error("Dokodemo failed to listen on port ", port, ": ", err)
+		log.Error("Dokodemo: Failed to listen on port ", port, ": ", err)
 		return err
 	}
 	this.tcpMutex.Lock()
 	this.tcpListener = tcpListener
 	this.tcpMutex.Unlock()
-	go this.AcceptTCPConnections()
 	return nil
 }
 
-func (this *DokodemoDoor) AcceptTCPConnections() {
-	for this.accepting {
-		retry.Timed(100, 100).On(func() error {
-			this.tcpMutex.RLock()
-			defer this.tcpMutex.RUnlock()
-			if !this.accepting {
-				return nil
-			}
-			connection, err := this.tcpListener.AcceptTCP()
-			if err != nil {
-				log.Error("Dokodemo failed to accept new connections: ", err)
-				return err
-			}
-			go this.HandleTCPConnection(connection)
-			return nil
-		})
-	}
-}
-
-func (this *DokodemoDoor) HandleTCPConnection(conn *net.TCPConn) {
+func (this *DokodemoDoor) HandleTCPConnection(conn *listener.TCPConn) {
 	defer conn.Close()
 
 	packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true)

+ 6 - 28
proxy/http/http.go

@@ -13,9 +13,9 @@ import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/common/serial"
 	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/listener"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -24,7 +24,7 @@ type HttpProxyServer struct {
 	accepting     bool
 	space         app.Space
 	config        *Config
-	tcpListener   *net.TCPListener
+	tcpListener   *listener.TCPListener
 	listeningPort v2net.Port
 }
 
@@ -42,8 +42,8 @@ func (this *HttpProxyServer) Port() v2net.Port {
 func (this *HttpProxyServer) Close() {
 	this.accepting = false
 	if this.tcpListener != nil {
-		this.tcpListener.Close()
 		this.Lock()
+		this.tcpListener.Close()
 		this.tcpListener = nil
 		this.Unlock()
 	}
@@ -59,40 +59,18 @@ func (this *HttpProxyServer) Listen(port v2net.Port) error {
 	}
 	this.listeningPort = port
 
-	tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
-		Port: int(port.Value()),
-		IP:   []byte{0, 0, 0, 0},
-	})
+	tcpListener, err := listener.ListenTCP(port, this.handleConnection)
 	if err != nil {
+		log.Error("Http: Failed listen on port ", port, ": ", err)
 		return err
 	}
 	this.Lock()
 	this.tcpListener = tcpListener
 	this.Unlock()
 	this.accepting = true
-	go this.accept()
 	return nil
 }
 
-func (this *HttpProxyServer) accept() {
-	for this.accepting {
-		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			this.Lock()
-			defer this.Unlock()
-			if !this.accepting {
-				return nil
-			}
-			tcpConn, err := this.tcpListener.AcceptTCP()
-			if err != nil {
-				log.Error("Failed to accept HTTP connection: ", err)
-				return err
-			}
-			go this.handleConnection(tcpConn)
-			return nil
-		})
-	}
-}
-
 func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error) {
 	port := defaultPort
 	host, rawPort, err := net.SplitHostPort(rawHost)
@@ -116,7 +94,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
 	return v2net.TCPDestination(v2net.DomainAddress(host), port), nil
 }
 
-func (this *HttpProxyServer) handleConnection(conn *net.TCPConn) {
+func (this *HttpProxyServer) handleConnection(conn *listener.TCPConn) {
 	defer conn.Close()
 	reader := bufio.NewReader(conn)
 

+ 1 - 0
proxy/shadowsocks/config.go

@@ -46,4 +46,5 @@ func (this *AesCfb) NewDecodingStream(key []byte, iv []byte, reader io.Reader) (
 type Config struct {
 	Cipher   Cipher
 	Password string
+	UDP      bool
 }

+ 2 - 0
proxy/shadowsocks/config_json.go

@@ -14,6 +14,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
 	type JsonConfig struct {
 		Cipher   serial.StringLiteral `json:"method"`
 		Password serial.StringLiteral `json:"password"`
+		UDP      bool                 `json:"udp"`
 	}
 	jsonConfig := new(JsonConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -23,6 +24,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
 		log.Error("Shadowsocks: Password is not specified.")
 		return internal.ErrorBadConfiguration
 	}
+	this.UDP = jsonConfig.UDP
 	this.Password = jsonConfig.Password.String()
 	if this.Cipher == nil {
 		log.Error("Shadowsocks: Cipher method is not specified.")

+ 33 - 2
proxy/shadowsocks/shadowsocks.go

@@ -3,18 +3,49 @@
 package shadowsocks
 
 import (
+	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/listener"
 )
 
 type Shadowsocks struct {
-	config *Config
-	port   v2net.Port
+	config      *Config
+	port        v2net.Port
+	accepting   bool
+	tcpListener *listener.TCPListener
 }
 
 func (this *Shadowsocks) Port() v2net.Port {
 	return this.port
 }
 
+func (this *Shadowsocks) Close() {
+	this.accepting = false
+	this.tcpListener.Close()
+	this.tcpListener = nil
+}
+
 func (this *Shadowsocks) Listen(port v2net.Port) error {
+	if this.accepting {
+		if this.port == port {
+			return nil
+		} else {
+			return proxy.ErrorAlreadyListening
+		}
+	}
+
+	tcpListener, err := listener.ListenTCP(port, this.handleConnection)
+	if err != nil {
+		log.Error("Shadowsocks: Failed to listen on port ", port, ": ", err)
+		return err
+	}
+	this.tcpListener = tcpListener
+	this.accepting = true
 	return nil
 }
+
+func (this *Shadowsocks) handleConnection(conn *listener.TCPConn) {
+	defer conn.Close()
+
+}

+ 8 - 32
proxy/socks/socks.go

@@ -11,9 +11,9 @@ import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/socks/protocol"
+	"github.com/v2ray/v2ray-core/transport/listener"
 )
 
 var (
@@ -28,7 +28,7 @@ type SocksServer struct {
 	accepting     bool
 	space         app.Space
 	config        *Config
-	tcpListener   *net.TCPListener
+	tcpListener   *listener.TCPListener
 	udpConn       *net.UDPConn
 	udpAddress    v2net.Destination
 	listeningPort v2net.Port
@@ -48,8 +48,8 @@ func (this *SocksServer) Port() v2net.Port {
 func (this *SocksServer) Close() {
 	this.accepting = false
 	if this.tcpListener != nil {
-		this.tcpListener.Close()
 		this.tcpMutex.Lock()
+		this.tcpListener.Close()
 		this.tcpListener = nil
 		this.tcpMutex.Unlock()
 	}
@@ -71,11 +71,7 @@ func (this *SocksServer) Listen(port v2net.Port) error {
 	}
 	this.listeningPort = port
 
-	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
-		IP:   []byte{0, 0, 0, 0},
-		Port: int(port),
-		Zone: "",
-	})
+	listener, err := listener.ListenTCP(port, this.HandleConnection)
 	if err != nil {
 		log.Error("Socks: failed to listen on port ", port, ": ", err)
 		return err
@@ -84,33 +80,13 @@ func (this *SocksServer) Listen(port v2net.Port) error {
 	this.tcpMutex.Lock()
 	this.tcpListener = listener
 	this.tcpMutex.Unlock()
-	go this.AcceptConnections()
 	if this.config.UDPEnabled {
 		this.ListenUDP(port)
 	}
 	return nil
 }
 
-func (this *SocksServer) AcceptConnections() {
-	for this.accepting {
-		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			this.tcpMutex.RLock()
-			defer this.tcpMutex.RUnlock()
-			if !this.accepting {
-				return nil
-			}
-			connection, err := this.tcpListener.AcceptTCP()
-			if err != nil {
-				log.Error("Socks: failed to accept new connection: ", err)
-				return err
-			}
-			go this.HandleConnection(connection)
-			return nil
-		})
-	}
-}
-
-func (this *SocksServer) HandleConnection(connection *net.TCPConn) error {
+func (this *SocksServer) HandleConnection(connection *listener.TCPConn) {
 	defer connection.Close()
 
 	reader := v2net.NewTimeOutReader(120, connection)
@@ -118,13 +94,13 @@ func (this *SocksServer) HandleConnection(connection *net.TCPConn) error {
 	auth, auth4, err := protocol.ReadAuthentication(reader)
 	if err != nil && err != protocol.Socks4Downgrade {
 		log.Error("Socks: failed to read authentication: ", err)
-		return err
+		return
 	}
 
 	if err != nil && err == protocol.Socks4Downgrade {
-		return this.handleSocks4(reader, connection, auth4)
+		this.handleSocks4(reader, connection, auth4)
 	} else {
-		return this.handleSocks5(reader, connection, auth)
+		this.handleSocks5(reader, connection, auth)
 	}
 }
 

+ 8 - 37
proxy/vmess/inbound/inbound.go

@@ -3,7 +3,6 @@ package inbound
 import (
 	"crypto/md5"
 	"io"
-	"net"
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
@@ -11,12 +10,12 @@ import (
 	v2crypto "github.com/v2ray/v2ray-core/common/crypto"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/common/serial"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	"github.com/v2ray/v2ray-core/proxy/vmess"
 	"github.com/v2ray/v2ray-core/proxy/vmess/protocol"
+	"github.com/v2ray/v2ray-core/transport/listener"
 )
 
 // Inbound connection handler that handles messages in VMess format.
@@ -26,7 +25,7 @@ type VMessInboundHandler struct {
 	clients       protocol.UserSet
 	user          *vmess.User
 	accepting     bool
-	listener      *net.TCPListener
+	listener      *listener.TCPListener
 	features      *FeaturesConfig
 	listeningPort v2net.Port
 }
@@ -38,8 +37,8 @@ func (this *VMessInboundHandler) Port() v2net.Port {
 func (this *VMessInboundHandler) Close() {
 	this.accepting = false
 	if this.listener != nil {
-		this.listener.Close()
 		this.Lock()
+		this.listener.Close()
 		this.listener = nil
 		this.Unlock()
 	}
@@ -59,45 +58,19 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error {
 	}
 	this.listeningPort = port
 
-	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
-		IP:   []byte{0, 0, 0, 0},
-		Port: int(port),
-		Zone: "",
-	})
+	tcpListener, err := listener.ListenTCP(port, this.HandleConnection)
 	if err != nil {
 		log.Error("Unable to listen tcp port ", port, ": ", err)
 		return err
 	}
 	this.accepting = true
 	this.Lock()
-	this.listener = listener
+	this.listener = tcpListener
 	this.Unlock()
-	go this.AcceptConnections()
 	return nil
 }
 
-func (this *VMessInboundHandler) AcceptConnections() error {
-	for this.accepting {
-		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			this.Lock()
-			defer this.Unlock()
-			if !this.accepting {
-				return nil
-			}
-			connection, err := this.listener.AcceptTCP()
-			if err != nil {
-				log.Error("Failed to accpet connection: ", err)
-				return err
-			}
-			go this.HandleConnection(connection)
-			return nil
-		})
-
-	}
-	return nil
-}
-
-func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error {
+func (this *VMessInboundHandler) HandleConnection(connection *listener.TCPConn) {
 	defer connection.Close()
 
 	connReader := v2net.NewTimeOutReader(16, connection)
@@ -107,7 +80,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error
 	if err != nil {
 		log.Access(connection.RemoteAddr(), serial.StringLiteral(""), log.AccessRejected, serial.StringLiteral(err.Error()))
 		log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
-		return err
+		return
 	}
 	log.Access(connection.RemoteAddr(), request.Address, log.AccessAccepted, serial.StringLiteral(""))
 	log.Debug("VMessIn: Received request for ", request.Address)
@@ -130,7 +103,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error
 	if err != nil {
 		log.Error("VMessIn: Failed to create AES decryption stream: ", err)
 		close(input)
-		return err
+		return
 	}
 
 	responseWriter := v2crypto.NewCryptionWriter(aesStream, connection)
@@ -151,8 +124,6 @@ func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error
 
 	connection.CloseWrite()
 	readFinish.Lock()
-
-	return nil
 }
 
 func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {

+ 110 - 0
transport/listener/tcp.go

@@ -0,0 +1,110 @@
+package listener
+
+import (
+	"net"
+	"time"
+
+	"github.com/v2ray/v2ray-core/common/log"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+type TCPConn struct {
+	conn     *net.TCPConn
+	listener *TCPListener
+	dirty    bool
+}
+
+func (this *TCPConn) Read(b []byte) (int, error) {
+	return this.conn.Read(b)
+}
+
+func (this *TCPConn) Write(b []byte) (int, error) {
+	return this.conn.Write(b)
+}
+
+func (this *TCPConn) Close() error {
+	return this.conn.Close()
+}
+
+func (this *TCPConn) Release() {
+	if this.dirty {
+		this.Close()
+		return
+	}
+	this.listener.recycle(this.conn)
+}
+
+func (this *TCPConn) LocalAddr() net.Addr {
+	return this.conn.LocalAddr()
+}
+
+func (this *TCPConn) RemoteAddr() net.Addr {
+	return this.conn.RemoteAddr()
+}
+
+func (this *TCPConn) SetDeadline(t time.Time) error {
+	return this.conn.SetDeadline(t)
+}
+
+func (this *TCPConn) SetReadDeadline(t time.Time) error {
+	return this.conn.SetReadDeadline(t)
+}
+
+func (this *TCPConn) SetWriteDeadline(t time.Time) error {
+	return this.conn.SetWriteDeadline(t)
+}
+
+func (this *TCPConn) CloseRead() error {
+	return this.conn.CloseRead()
+}
+
+func (this *TCPConn) CloseWrite() error {
+	return this.conn.CloseWrite()
+}
+
+type TCPListener struct {
+	listener     *net.TCPListener
+	connCallback func(*TCPConn)
+	accepting    bool
+}
+
+func ListenTCP(port v2net.Port, callback func(*TCPConn)) (*TCPListener, error) {
+	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
+		IP:   []byte{0, 0, 0, 0},
+		Port: int(port),
+		Zone: "",
+	})
+	if err != nil {
+		return nil, err
+	}
+	tcpListener := &TCPListener{
+		listener:     listener,
+		connCallback: callback,
+	}
+	go tcpListener.start()
+	return tcpListener, nil
+}
+
+func (this *TCPListener) Close() {
+	this.accepting = false
+	this.listener.Close()
+}
+
+func (this *TCPListener) start() {
+	this.accepting = true
+	for this.accepting {
+		conn, err := this.listener.AcceptTCP()
+		if err != nil {
+			log.Warning("Listener: Failed to accept new TCP connection: ", err)
+			continue
+		}
+		go this.connCallback(&TCPConn{
+			conn:     conn,
+			listener: this,
+		})
+	}
+}
+
+func (this *TCPListener) recycle(conn *net.TCPConn) {
+
+}