Browse Source

close method for point server

v2ray 9 years ago
parent
commit
56c5302367

+ 23 - 14
proxy/dokodemo/dokodemo.go

@@ -13,7 +13,8 @@ import (
 )
 )
 
 
 type DokodemoDoor struct {
 type DokodemoDoor struct {
-	sync.Mutex
+	tcpMutex    sync.RWMutex
+	udpMutex    sync.RWMutex
 	config      Config
 	config      Config
 	accepting   bool
 	accepting   bool
 	address     v2net.Address
 	address     v2net.Address
@@ -35,16 +36,16 @@ func NewDokodemoDoor(space app.Space, config Config) *DokodemoDoor {
 func (this *DokodemoDoor) Close() {
 func (this *DokodemoDoor) Close() {
 	this.accepting = false
 	this.accepting = false
 	if this.tcpListener != nil {
 	if this.tcpListener != nil {
-		this.Lock()
 		this.tcpListener.Close()
 		this.tcpListener.Close()
+		this.tcpMutex.Lock()
 		this.tcpListener = nil
 		this.tcpListener = nil
-		this.Unlock()
+		this.tcpMutex.Unlock()
 	}
 	}
 	if this.udpConn != nil {
 	if this.udpConn != nil {
-		this.Lock()
 		this.udpConn.Close()
 		this.udpConn.Close()
+		this.udpMutex.Lock()
 		this.udpConn = nil
 		this.udpConn = nil
-		this.Unlock()
+		this.udpMutex.Unlock()
 	}
 	}
 }
 }
 
 
@@ -84,13 +85,12 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
 func (this *DokodemoDoor) handleUDPPackets() {
 func (this *DokodemoDoor) handleUDPPackets() {
 	for this.accepting {
 	for this.accepting {
 		buffer := alloc.NewBuffer()
 		buffer := alloc.NewBuffer()
-		var udpConn *net.UDPConn
-		this.Lock()
-		if this.udpConn != nil {
-			udpConn = this.udpConn
+		this.udpMutex.RLock()
+		if !this.accepting {
+			return
 		}
 		}
-		this.Unlock()
-		nBytes, addr, err := udpConn.ReadFromUDP(buffer.Value)
+		nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value)
+		this.udpMutex.RUnlock()
 		buffer.Slice(0, nBytes)
 		buffer.Slice(0, nBytes)
 		if err != nil {
 		if err != nil {
 			buffer.Release()
 			buffer.Release()
@@ -103,7 +103,13 @@ func (this *DokodemoDoor) handleUDPPackets() {
 		close(ray.InboundInput())
 		close(ray.InboundInput())
 
 
 		for payload := range ray.InboundOutput() {
 		for payload := range ray.InboundOutput() {
-			udpConn.WriteToUDP(payload.Value, addr)
+			this.udpMutex.RLock()
+			if !this.accepting {
+				this.udpMutex.RUnlock()
+				return
+			}
+			this.udpConn.WriteToUDP(payload.Value, addr)
+			this.udpMutex.RUnlock()
 		}
 		}
 	}
 	}
 }
 }
@@ -126,8 +132,11 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
 func (this *DokodemoDoor) AcceptTCPConnections() {
 func (this *DokodemoDoor) AcceptTCPConnections() {
 	for this.accepting {
 	for this.accepting {
 		retry.Timed(100, 100).On(func() error {
 		retry.Timed(100, 100).On(func() error {
-			this.Lock()
-			defer this.Unlock()
+			if !this.accepting {
+				return nil
+			}
+			this.tcpMutex.RLock()
+			defer this.tcpMutex.RUnlock()
 			if this.tcpListener != nil {
 			if this.tcpListener != nil {
 				connection, err := this.tcpListener.AcceptTCP()
 				connection, err := this.tcpListener.AcceptTCP()
 				if err != nil {
 				if err != nil {

+ 1 - 1
proxy/http/http.go

@@ -35,8 +35,8 @@ func NewHttpProxyServer(space app.Space, config Config) *HttpProxyServer {
 func (this *HttpProxyServer) Close() {
 func (this *HttpProxyServer) Close() {
 	this.accepting = false
 	this.accepting = false
 	if this.tcpListener != nil {
 	if this.tcpListener != nil {
-		this.Lock()
 		this.tcpListener.Close()
 		this.tcpListener.Close()
+		this.Lock()
 		this.tcpListener = nil
 		this.tcpListener = nil
 		this.Unlock()
 		this.Unlock()
 	}
 	}

+ 16 - 15
proxy/socks/socks.go

@@ -23,7 +23,8 @@ var (
 
 
 // SocksServer is a SOCKS 5 proxy server
 // SocksServer is a SOCKS 5 proxy server
 type SocksServer struct {
 type SocksServer struct {
-	sync.RWMutex
+	tcpMutex    sync.RWMutex
+	udpMutex    sync.RWMutex
 	accepting   bool
 	accepting   bool
 	space       app.Space
 	space       app.Space
 	config      Config
 	config      Config
@@ -42,20 +43,16 @@ func NewSocksServer(space app.Space, config Config) *SocksServer {
 func (this *SocksServer) Close() {
 func (this *SocksServer) Close() {
 	this.accepting = false
 	this.accepting = false
 	if this.tcpListener != nil {
 	if this.tcpListener != nil {
-		this.Lock()
-		if this.tcpListener != nil {
-			this.tcpListener.Close()
-			this.tcpListener = nil
-		}
-		this.Unlock()
+		this.tcpListener.Close()
+		this.tcpMutex.Lock()
+		this.tcpListener = nil
+		this.tcpMutex.Unlock()
 	}
 	}
 	if this.udpConn != nil {
 	if this.udpConn != nil {
-		this.Lock()
-		if this.udpConn != nil {
-			this.udpConn.Close()
-			this.udpConn = nil
-		}
-		this.Unlock()
+		this.udpConn.Close()
+		this.udpMutex.Lock()
+		this.udpConn = nil
+		this.udpMutex.Unlock()
 	}
 	}
 }
 }
 
 
@@ -81,12 +78,16 @@ func (this *SocksServer) Listen(port v2net.Port) error {
 func (this *SocksServer) AcceptConnections() {
 func (this *SocksServer) AcceptConnections() {
 	for this.accepting {
 	for this.accepting {
 		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
 		retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
-			this.RLock()
-			defer this.RUnlock()
 			if !this.accepting {
 			if !this.accepting {
 				return nil
 				return nil
 			}
 			}
+			this.tcpMutex.RLock()
+			if this.tcpListener == nil {
+				this.tcpMutex.RUnlock()
+				return nil
+			}
 			connection, err := this.tcpListener.AcceptTCP()
 			connection, err := this.tcpListener.AcceptTCP()
+			this.tcpMutex.RUnlock()
 			if err != nil {
 			if err != nil {
 				log.Error("Socks failed to accept new connection %v", err)
 				log.Error("Socks failed to accept new connection %v", err)
 				return err
 				return err

+ 6 - 6
proxy/socks/udp.go

@@ -30,13 +30,13 @@ func (this *SocksServer) ListenUDP(port v2net.Port) error {
 func (this *SocksServer) AcceptPackets() error {
 func (this *SocksServer) AcceptPackets() error {
 	for this.accepting {
 	for this.accepting {
 		buffer := alloc.NewBuffer()
 		buffer := alloc.NewBuffer()
-		this.RLock()
+		this.udpMutex.RLock()
 		if !this.accepting {
 		if !this.accepting {
-			this.RUnlock()
+			this.udpMutex.RUnlock()
 			return nil
 			return nil
 		}
 		}
 		nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value)
 		nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value)
-		this.RUnlock()
+		this.udpMutex.RUnlock()
 		if err != nil {
 		if err != nil {
 			log.Error("Socks failed to read UDP packets: %v", err)
 			log.Error("Socks failed to read UDP packets: %v", err)
 			buffer.Release()
 			buffer.Release()
@@ -82,13 +82,13 @@ func (this *SocksServer) handlePacket(packet v2net.Packet, clientAddr *net.UDPAd
 		udpMessage := alloc.NewSmallBuffer().Clear()
 		udpMessage := alloc.NewSmallBuffer().Clear()
 		response.Write(udpMessage)
 		response.Write(udpMessage)
 
 
-		this.RLock()
+		this.udpMutex.RLock()
 		if !this.accepting {
 		if !this.accepting {
-			this.RUnlock()
+			this.udpMutex.RUnlock()
 			return
 			return
 		}
 		}
 		nBytes, err := this.udpConn.WriteToUDP(udpMessage.Value, clientAddr)
 		nBytes, err := this.udpConn.WriteToUDP(udpMessage.Value, clientAddr)
-		this.RUnlock()
+		this.udpMutex.RUnlock()
 		udpMessage.Release()
 		udpMessage.Release()
 		response.Data.Release()
 		response.Data.Release()
 		if err != nil {
 		if err != nil {

+ 2 - 4
proxy/vmess/inbound/inbound.go

@@ -38,11 +38,9 @@ func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInbound
 func (this *VMessInboundHandler) Close() {
 func (this *VMessInboundHandler) Close() {
 	this.accepting = false
 	this.accepting = false
 	if this.listener != nil {
 	if this.listener != nil {
+		this.listener.Close()
 		this.Lock()
 		this.Lock()
-		if this.listener != nil {
-			this.listener.Close()
-			this.listener = nil
-		}
+		this.listener = nil
 		this.Unlock()
 		this.Unlock()
 	}
 	}
 }
 }

+ 6 - 0
shell/point/inbound_detour.go

@@ -39,6 +39,12 @@ func (this *InboundDetourHandler) Initialize() error {
 	return nil
 	return nil
 }
 }
 
 
+func (this *InboundDetourHandler) Close() {
+	for _, ich := range this.ich {
+		ich.handler.Close()
+	}
+}
+
 // Starts the inbound connection handler.
 // Starts the inbound connection handler.
 func (this *InboundDetourHandler) Start() error {
 func (this *InboundDetourHandler) Start() error {
 	for _, ich := range this.ich {
 	for _, ich := range this.ich {

+ 7 - 0
shell/point/point.go

@@ -113,6 +113,13 @@ func NewPoint(pConfig PointConfig) (*Point, error) {
 	return vpoint, nil
 	return vpoint, nil
 }
 }
 
 
+func (this *Point) Close() {
+	this.ich.Close()
+	for _, idh := range this.idh {
+		idh.Close()
+	}
+}
+
 // Start starts the Point server, and return any error during the process.
 // Start starts the Point server, and return any error during the process.
 // In the case of any errors, the state of the server is unpredicatable.
 // In the case of any errors, the state of the server is unpredicatable.
 func (this *Point) Start() error {
 func (this *Point) Start() error {

+ 2 - 0
testing/scenarios/dokodemo_test.go

@@ -50,4 +50,6 @@ func TestDokodemoTCP(t *testing.T) {
 		assert.StringLiteral("Processed: " + payload).Equals(string(response[:nBytes]))
 		assert.StringLiteral("Processed: " + payload).Equals(string(response[:nBytes]))
 		conn.Close()
 		conn.Close()
 	}
 	}
+
+	CloseAllServers()
 }
 }

+ 2 - 0
testing/scenarios/router_test.go

@@ -76,4 +76,6 @@ func TestRouter(t *testing.T) {
 	assert.Int(nBytes).Equals(0)
 	assert.Int(nBytes).Equals(0)
 	assert.Bool(tcpServer2Accessed).IsFalse()
 	assert.Bool(tcpServer2Accessed).IsFalse()
 	conn.Close()
 	conn.Close()
+
+	CloseAllServers()
 }
 }

+ 9 - 5
testing/scenarios/server_env.go

@@ -27,7 +27,7 @@ import (
 )
 )
 
 
 var (
 var (
-	serverup = make(map[string]bool)
+	runningServers = make([]*point.Point, 0, 10)
 )
 )
 
 
 func TestFile(filename string) string {
 func TestFile(filename string) string {
@@ -35,9 +35,6 @@ func TestFile(filename string) string {
 }
 }
 
 
 func InitializeServerSetOnce(testcase string) error {
 func InitializeServerSetOnce(testcase string) error {
-	if up, found := serverup[testcase]; found && up {
-		return nil
-	}
 	err := InitializeServer(TestFile(testcase + "_server.json"))
 	err := InitializeServer(TestFile(testcase + "_server.json"))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -46,7 +43,6 @@ func InitializeServerSetOnce(testcase string) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	serverup[testcase] = true
 	return nil
 	return nil
 }
 }
 
 
@@ -68,6 +64,14 @@ func InitializeServer(configFile string) error {
 		log.Error("Error starting Point server: %v", err)
 		log.Error("Error starting Point server: %v", err)
 		return err
 		return err
 	}
 	}
+	runningServers = append(runningServers, vPoint)
 
 
 	return nil
 	return nil
 }
 }
+
+func CloseAllServers() {
+	for _, server := range runningServers {
+		server.Close()
+	}
+	runningServers = make([]*point.Point, 0, 10)
+}

+ 6 - 4
testing/scenarios/socks_end_test.go

@@ -12,10 +12,6 @@ import (
 	"github.com/v2ray/v2ray-core/testing/servers/udp"
 	"github.com/v2ray/v2ray-core/testing/servers/udp"
 )
 )
 
 
-var (
-	serverUp = false
-)
-
 func TestTCPConnection(t *testing.T) {
 func TestTCPConnection(t *testing.T) {
 	v2testing.Current(t)
 	v2testing.Current(t)
 
 
@@ -86,6 +82,8 @@ func TestTCPConnection(t *testing.T) {
 
 
 		conn.Close()
 		conn.Close()
 	}
 	}
+
+	CloseAllServers()
 }
 }
 
 
 func TestTCPBind(t *testing.T) {
 func TestTCPBind(t *testing.T) {
@@ -135,6 +133,8 @@ func TestTCPBind(t *testing.T) {
 	assert.Bytes(connectResponse[:nBytes]).Equals([]byte{socks5Version, 7, 0, 1, 0, 0, 0, 0, 0, 0})
 	assert.Bytes(connectResponse[:nBytes]).Equals([]byte{socks5Version, 7, 0, 1, 0, 0, 0, 0, 0, 0})
 
 
 	conn.Close()
 	conn.Close()
+
+	CloseAllServers()
 }
 }
 
 
 func TestUDPAssociate(t *testing.T) {
 func TestUDPAssociate(t *testing.T) {
@@ -204,4 +204,6 @@ func TestUDPAssociate(t *testing.T) {
 
 
 	udpConn.Close()
 	udpConn.Close()
 	conn.Close()
 	conn.Close()
+
+	CloseAllServers()
 }
 }