Browse Source

release all adaptive readers and writers

v2ray 9 years ago
parent
commit
44bf412dfc

+ 8 - 2
proxy/dokodemo/dokodemo.go

@@ -133,13 +133,19 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
 
 
 	reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
 	reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
 	go func() {
 	go func() {
-		v2io.Pipe(v2io.NewAdaptiveReader(reader), ray.InboundInput())
+		v2reader := v2io.NewAdaptiveReader(reader)
+		defer v2reader.Release()
+
+		v2io.Pipe(v2reader, ray.InboundInput())
 		inputFinish.Unlock()
 		inputFinish.Unlock()
 		ray.InboundInput().Close()
 		ray.InboundInput().Close()
 	}()
 	}()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(conn))
+		v2writer := v2io.NewAdaptiveWriter(conn)
+		defer v2writer.Release()
+
+		v2io.Pipe(ray.InboundOutput(), v2writer)
 		outputFinish.Unlock()
 		outputFinish.Unlock()
 	}()
 	}()
 
 

+ 8 - 2
proxy/freedom/freedom.go

@@ -51,7 +51,10 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
 		writeMutex.Unlock()
 		writeMutex.Unlock()
 	} else {
 	} else {
 		go func() {
 		go func() {
-			v2io.Pipe(input, v2io.NewAdaptiveWriter(conn))
+			v2writer := v2io.NewAdaptiveWriter(conn)
+			defer v2writer.Release()
+
+			v2io.Pipe(input, v2writer)
 			writeMutex.Unlock()
 			writeMutex.Unlock()
 		}()
 		}()
 	}
 	}
@@ -66,7 +69,10 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
 			reader = v2net.NewTimeOutReader(16 /* seconds */, conn)
 			reader = v2net.NewTimeOutReader(16 /* seconds */, conn)
 		}
 		}
 
 
-		v2io.Pipe(v2io.NewAdaptiveReader(reader), output)
+		v2reader := v2io.NewAdaptiveReader(reader)
+		defer v2reader.Release()
+
+		v2io.Pipe(v2reader, output)
 	}()
 	}()
 
 
 	writeMutex.Lock()
 	writeMutex.Lock()

+ 8 - 2
proxy/http/http.go

@@ -154,13 +154,19 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra
 	defer wg.Wait()
 	defer wg.Wait()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(v2io.NewAdaptiveReader(input), ray.InboundInput())
+		v2reader := v2io.NewAdaptiveReader(input)
+		defer v2reader.Release()
+
+		v2io.Pipe(v2reader, ray.InboundInput())
 		ray.InboundInput().Close()
 		ray.InboundInput().Close()
 		wg.Done()
 		wg.Done()
 	}()
 	}()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(output))
+		v2writer := v2io.NewAdaptiveWriter(output)
+		defer v2writer.Release()
+
+		v2io.Pipe(ray.InboundOutput(), v2writer)
 		ray.InboundOutput().Release()
 		ray.InboundOutput().Release()
 		wg.Done()
 		wg.Done()
 	}()
 	}()

+ 4 - 1
proxy/shadowsocks/shadowsocks.go

@@ -219,7 +219,10 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
 			payload.Release()
 			payload.Release()
 
 
 			writer := crypto.NewCryptionWriter(stream, conn)
 			writer := crypto.NewCryptionWriter(stream, conn)
-			v2io.Pipe(ray.InboundOutput(), v2io.NewAdaptiveWriter(writer))
+			v2writer := v2io.NewAdaptiveWriter(writer)
+			defer writer.Release()
+
+			v2io.Pipe(ray.InboundOutput(), v2writer)
 			ray.InboundOutput().Release()
 			ray.InboundOutput().Release()
 		}
 		}
 		writeFinish.Unlock()
 		writeFinish.Unlock()

+ 8 - 2
proxy/socks/socks.go

@@ -276,13 +276,19 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack
 	outputFinish.Lock()
 	outputFinish.Lock()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(v2io.NewAdaptiveReader(reader), input)
+		v2reader := v2io.NewAdaptiveReader(reader)
+		defer v2reader.Release()
+
+		v2io.Pipe(v2reader, input)
 		inputFinish.Unlock()
 		inputFinish.Unlock()
 		input.Close()
 		input.Close()
 	}()
 	}()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(output, v2io.NewAdaptiveWriter(writer))
+		v2writer := v2io.NewAdaptiveWriter(writer)
+		defer v2writer.Release()
+
+		v2io.Pipe(output, v2writer)
 		outputFinish.Unlock()
 		outputFinish.Unlock()
 		output.Release()
 		output.Release()
 	}()
 	}()

+ 8 - 2
proxy/testing/mocks/inboundhandler.go

@@ -42,13 +42,19 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
 	writeFinish.Lock()
 	writeFinish.Lock()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), input)
+		v2reader := v2io.NewAdaptiveReader(this.ConnInput)
+		defer v2reader.Release()
+
+		v2io.Pipe(v2reader, input)
 		input.Close()
 		input.Close()
 		readFinish.Unlock()
 		readFinish.Unlock()
 	}()
 	}()
 
 
 	go func() {
 	go func() {
-		v2io.Pipe(output, v2io.NewAdaptiveWriter(this.ConnOutput))
+		v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
+		defer v2writer.Release()
+
+		v2io.Pipe(output, v2writer)
 		output.Release()
 		output.Release()
 		writeFinish.Unlock()
 		writeFinish.Unlock()
 	}()
 	}()

+ 8 - 2
proxy/testing/mocks/outboundhandler.go

@@ -33,7 +33,10 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out
 		writeFinish.Lock()
 		writeFinish.Lock()
 
 
 		go func() {
 		go func() {
-			v2io.Pipe(input, v2io.NewAdaptiveWriter(this.ConnOutput))
+			v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
+			defer v2writer.Release()
+
+			v2io.Pipe(input, v2writer)
 			writeFinish.Unlock()
 			writeFinish.Unlock()
 			input.Release()
 			input.Release()
 		}()
 		}()
@@ -41,7 +44,10 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out
 		writeFinish.Lock()
 		writeFinish.Lock()
 	}
 	}
 
 
-	v2io.Pipe(v2io.NewAdaptiveReader(this.ConnInput), output)
+	v2reader := v2io.NewAdaptiveReader(this.ConnInput)
+	defer v2reader.Release()
+
+	v2io.Pipe(v2reader, output)
 	output.Close()
 	output.Close()
 
 
 	return nil
 	return nil