Pārlūkot izejas kodu

close method for inbound connection handler

v2ray 9 gadi atpakaļ
vecāks
revīzija
201481a82c

+ 45 - 15
proxy/dokodemo/dokodemo.go

@@ -13,11 +13,14 @@ import (
 )
 
 type DokodemoDoor struct {
-	config    Config
-	accepting bool
-	address   v2net.Address
-	port      v2net.Port
-	space     app.Space
+	sync.Mutex
+	config      Config
+	accepting   bool
+	address     v2net.Address
+	port        v2net.Port
+	space       app.Space
+	tcpListener *net.TCPListener
+	udpConn     *net.UDPConn
 }
 
 func NewDokodemoDoor(space app.Space, config Config) *DokodemoDoor {
@@ -29,6 +32,22 @@ func NewDokodemoDoor(space app.Space, config Config) *DokodemoDoor {
 	}
 }
 
+func (this *DokodemoDoor) Close() {
+	this.accepting = false
+	if this.tcpListener != nil {
+		this.Lock()
+		this.tcpListener.Close()
+		this.tcpListener = nil
+		this.Unlock()
+	}
+	if this.udpConn != nil {
+		this.Lock()
+		this.udpConn.Close()
+		this.udpConn = nil
+		this.Unlock()
+	}
+}
+
 func (this *DokodemoDoor) Listen(port v2net.Port) error {
 	this.accepting = true
 
@@ -57,14 +76,20 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
 		log.Error("Dokodemo failed to listen on port %d: %v", port, err)
 		return err
 	}
-	go this.handleUDPPackets(udpConn)
+	this.udpConn = udpConn
+	go this.handleUDPPackets()
 	return nil
 }
 
-func (this *DokodemoDoor) handleUDPPackets(udpConn *net.UDPConn) {
-	defer udpConn.Close()
+func (this *DokodemoDoor) handleUDPPackets() {
 	for this.accepting {
 		buffer := alloc.NewBuffer()
+		var udpConn *net.UDPConn
+		this.Lock()
+		if this.udpConn != nil {
+			udpConn = this.udpConn
+		}
+		this.Unlock()
 		nBytes, addr, err := udpConn.ReadFromUDP(buffer.Value)
 		buffer.Slice(0, nBytes)
 		if err != nil {
@@ -93,19 +118,24 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
 		log.Error("Dokodemo failed to listen on port %d: %v", port, err)
 		return err
 	}
-	go this.AcceptTCPConnections(tcpListener)
+	this.tcpListener = tcpListener
+	go this.AcceptTCPConnections()
 	return nil
 }
 
-func (this *DokodemoDoor) AcceptTCPConnections(tcpListener *net.TCPListener) {
+func (this *DokodemoDoor) AcceptTCPConnections() {
 	for this.accepting {
 		retry.Timed(100, 100).On(func() error {
-			connection, err := tcpListener.AcceptTCP()
-			if err != nil {
-				log.Error("Dokodemo failed to accept new connections: %v", err)
-				return err
+			this.Lock()
+			defer this.Unlock()
+			if this.tcpListener != nil {
+				connection, err := this.tcpListener.AcceptTCP()
+				if err != nil {
+					log.Error("Dokodemo failed to accept new connections: %v", err)
+					return err
+				}
+				go this.HandleTCPConnection(connection)
 			}
-			go this.HandleTCPConnection(connection)
 			return nil
 		})
 	}

+ 33 - 12
proxy/http/http.go

@@ -13,13 +13,16 @@ import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
 type HttpProxyServer struct {
-	accepting bool
-	space     app.Space
-	config    Config
+	sync.Mutex
+	accepting   bool
+	space       app.Space
+	config      Config
+	tcpListener *net.TCPListener
 }
 
 func NewHttpProxyServer(space app.Space, config Config) *HttpProxyServer {
@@ -29,6 +32,16 @@ func NewHttpProxyServer(space app.Space, config Config) *HttpProxyServer {
 	}
 }
 
+func (this *HttpProxyServer) Close() {
+	this.accepting = false
+	if this.tcpListener != nil {
+		this.Lock()
+		this.tcpListener.Close()
+		this.tcpListener = nil
+		this.Unlock()
+	}
+}
+
 func (this *HttpProxyServer) Listen(port v2net.Port) error {
 	tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
 		Port: int(port.Value()),
@@ -37,19 +50,27 @@ func (this *HttpProxyServer) Listen(port v2net.Port) error {
 	if err != nil {
 		return err
 	}
-	go this.accept(tcpListener)
+	this.tcpListener = tcpListener
+	this.accepting = true
+	go this.accept()
 	return nil
 }
 
-func (this *HttpProxyServer) accept(listener *net.TCPListener) {
-	this.accepting = true
+func (this *HttpProxyServer) accept() {
 	for this.accepting {
-		tcpConn, err := listener.AcceptTCP()
-		if err != nil {
-			log.Error("Failed to accept HTTP connection: %v", err)
-			continue
-		}
-		go this.handleConnection(tcpConn)
+		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
+			this.Lock()
+			defer this.Unlock()
+			if this.tcpListener != nil {
+				tcpConn, err := this.tcpListener.AcceptTCP()
+				if err != nil {
+					log.Error("Failed to accept HTTP connection: %v", err)
+					return err
+				}
+				go this.handleConnection(tcpConn)
+			}
+			return nil
+		})
 	}
 }
 

+ 1 - 0
proxy/proxy.go

@@ -12,6 +12,7 @@ type InboundConnectionHandler interface {
 	// Listen starts a InboundConnectionHandler by listen on a specific port. This method is called
 	// exactly once during runtime.
 	Listen(port v2net.Port) error
+	Close()
 }
 
 // An OutboundConnectionHandler handles outbound network connection for V2Ray.

+ 37 - 8
proxy/socks/socks.go

@@ -23,9 +23,13 @@ var (
 
 // SocksServer is a SOCKS 5 proxy server
 type SocksServer struct {
-	accepting bool
-	space     app.Space
-	config    Config
+	sync.RWMutex
+	accepting   bool
+	space       app.Space
+	config      Config
+	tcpListener *net.TCPListener
+	udpConn     *net.UDPConn
+	udpAddress  v2net.Destination
 }
 
 func NewSocksServer(space app.Space, config Config) *SocksServer {
@@ -35,6 +39,26 @@ func NewSocksServer(space app.Space, config Config) *SocksServer {
 	}
 }
 
+func (this *SocksServer) Close() {
+	this.accepting = false
+	if this.tcpListener != nil {
+		this.Lock()
+		if this.tcpListener != nil {
+			this.tcpListener.Close()
+			this.tcpListener = nil
+		}
+		this.Unlock()
+	}
+	if this.udpConn != nil {
+		this.Lock()
+		if this.udpConn != nil {
+			this.udpConn.Close()
+			this.udpConn = nil
+		}
+		this.Unlock()
+	}
+}
+
 func (this *SocksServer) Listen(port v2net.Port) error {
 	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
 		IP:   []byte{0, 0, 0, 0},
@@ -46,17 +70,23 @@ func (this *SocksServer) Listen(port v2net.Port) error {
 		return err
 	}
 	this.accepting = true
-	go this.AcceptConnections(listener)
+	this.tcpListener = listener
+	go this.AcceptConnections()
 	if this.config.UDPEnabled() {
 		this.ListenUDP(port)
 	}
 	return nil
 }
 
-func (this *SocksServer) AcceptConnections(listener *net.TCPListener) {
+func (this *SocksServer) AcceptConnections() {
 	for this.accepting {
 		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			connection, err := listener.AcceptTCP()
+			this.RLock()
+			defer this.RUnlock()
+			if !this.accepting {
+				return nil
+			}
+			connection, err := this.tcpListener.AcceptTCP()
 			if err != nil {
 				log.Error("Socks failed to accept new connection %v", err)
 				return err
@@ -64,7 +94,6 @@ func (this *SocksServer) AcceptConnections(listener *net.TCPListener) {
 			go this.HandleConnection(connection)
 			return nil
 		})
-
 	}
 }
 
@@ -187,7 +216,7 @@ func (this *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer
 	response := protocol.NewSocks5Response()
 	response.Error = protocol.ErrorSuccess
 
-	udpAddr := this.getUDPAddr()
+	udpAddr := this.udpAddress
 
 	response.Port = udpAddr.Port()
 	switch {

+ 22 - 14
proxy/socks/udp.go

@@ -9,8 +9,6 @@ import (
 	"github.com/v2ray/v2ray-core/proxy/socks/protocol"
 )
 
-var udpAddress v2net.Destination
-
 func (this *SocksServer) ListenUDP(port v2net.Port) error {
 	addr := &net.UDPAddr{
 		IP:   net.IP{0, 0, 0, 0},
@@ -22,20 +20,23 @@ func (this *SocksServer) ListenUDP(port v2net.Port) error {
 		log.Error("Socks failed to listen UDP on port %d: %v", port, err)
 		return err
 	}
-	udpAddress = v2net.UDPDestination(v2net.IPAddress(this.config.IP()), port)
+	this.udpAddress = v2net.UDPDestination(v2net.IPAddress(this.config.IP()), port)
+	this.udpConn = conn
 
-	go this.AcceptPackets(conn)
+	go this.AcceptPackets()
 	return nil
 }
 
-func (this *SocksServer) getUDPAddr() v2net.Destination {
-	return udpAddress
-}
-
-func (this *SocksServer) AcceptPackets(conn *net.UDPConn) error {
-	for {
+func (this *SocksServer) AcceptPackets() error {
+	for this.accepting {
 		buffer := alloc.NewBuffer()
-		nBytes, addr, err := conn.ReadFromUDP(buffer.Value)
+		this.RLock()
+		if !this.accepting {
+			this.RUnlock()
+			return nil
+		}
+		nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value)
+		this.RUnlock()
 		if err != nil {
 			log.Error("Socks failed to read UDP packets: %v", err)
 			buffer.Release()
@@ -60,11 +61,12 @@ func (this *SocksServer) AcceptPackets(conn *net.UDPConn) error {
 
 		udpPacket := v2net.NewPacket(request.Destination(), request.Data, false)
 		log.Info("Send packet to %s with %d bytes", udpPacket.Destination().String(), request.Data.Len())
-		go this.handlePacket(conn, udpPacket, addr, request.Address, request.Port)
+		go this.handlePacket(udpPacket, addr, request.Address, request.Port)
 	}
+	return nil
 }
 
-func (this *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address, port v2net.Port) {
+func (this *SocksServer) handlePacket(packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address, port v2net.Port) {
 	ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
 	close(ray.InboundInput())
 
@@ -80,7 +82,13 @@ func (this *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, cl
 		udpMessage := alloc.NewSmallBuffer().Clear()
 		response.Write(udpMessage)
 
-		nBytes, err := conn.WriteToUDP(udpMessage.Value, clientAddr)
+		this.RLock()
+		if !this.accepting {
+			this.RUnlock()
+			return
+		}
+		nBytes, err := this.udpConn.WriteToUDP(udpMessage.Value, clientAddr)
+		this.RUnlock()
 		udpMessage.Release()
 		response.Data.Release()
 		if err != nil {

+ 4 - 0
proxy/testing/mocks/inboundhandler.go

@@ -20,6 +20,10 @@ func (this *InboundConnectionHandler) Listen(port v2net.Port) error {
 	return nil
 }
 
+func (this *InboundConnectionHandler) Close() {
+
+}
+
 func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
 	ray := this.Space.PacketDispatcher().DispatchToOutbound(packet)
 

+ 29 - 7
proxy/vmess/inbound/inbound.go

@@ -21,9 +21,11 @@ import (
 
 // Inbound connection handler that handles messages in VMess format.
 type VMessInboundHandler struct {
+	sync.Mutex
 	space     app.Space
 	clients   user.UserSet
 	accepting bool
+	listener  *net.TCPListener
 }
 
 func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInboundHandler {
@@ -33,6 +35,18 @@ func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInbound
 	}
 }
 
+func (this *VMessInboundHandler) Close() {
+	this.accepting = false
+	if this.listener != nil {
+		this.Lock()
+		if this.listener != nil {
+			this.listener.Close()
+			this.listener = nil
+		}
+		this.Unlock()
+	}
+}
+
 func (this *VMessInboundHandler) Listen(port v2net.Port) error {
 	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
 		IP:   []byte{0, 0, 0, 0},
@@ -44,19 +58,27 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error {
 		return err
 	}
 	this.accepting = true
-	go this.AcceptConnections(listener)
+	this.listener = listener
+	go this.AcceptConnections()
 	return nil
 }
 
-func (this *VMessInboundHandler) AcceptConnections(listener *net.TCPListener) error {
+func (this *VMessInboundHandler) AcceptConnections() error {
 	for this.accepting {
 		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			connection, err := listener.AcceptTCP()
-			if err != nil {
-				log.Error("Failed to accpet connection: %s", err.Error())
-				return err
+			if !this.accepting {
+				return nil
+			}
+			this.Lock()
+			defer this.Unlock()
+			if this.listener != nil {
+				connection, err := this.listener.AcceptTCP()
+				if err != nil {
+					log.Error("Failed to accpet connection: %s", err.Error())
+					return err
+				}
+				go this.HandleConnection(connection)
 			}
-			go this.HandleConnection(connection)
 			return nil
 		})