Browse Source

reorg chan reader

v2ray 9 years ago
parent
commit
46f76e55e5
2 changed files with 26 additions and 8 deletions
  1. 25 7
      common/io/chan_reader.go
  2. 1 1
      proxy/http/server.go

+ 25 - 7
proxy/http/chan_reader.go → common/io/chan_reader.go

@@ -1,27 +1,29 @@
-package http
+package io
 
 import (
 	"io"
+	"sync"
 
 	"github.com/v2ray/v2ray-core/common/alloc"
-	v2io "github.com/v2ray/v2ray-core/common/io"
 )
 
 type ChanReader struct {
-	stream  v2io.Reader
+	sync.Mutex
+	stream  Reader
 	current *alloc.Buffer
 	eof     bool
 }
 
-func NewChanReader(stream v2io.Reader) *ChanReader {
+func NewChanReader(stream Reader) *ChanReader {
 	this := &ChanReader{
 		stream: stream,
 	}
-	this.fill()
+	this.Fill()
 	return this
 }
 
-func (this *ChanReader) fill() {
+// @Private
+func (this *ChanReader) Fill() {
 	b, err := this.stream.Read()
 	this.current = b
 	if err != nil {
@@ -31,8 +33,14 @@ func (this *ChanReader) fill() {
 }
 
 func (this *ChanReader) Read(b []byte) (int, error) {
+	if this.eof {
+		return 0, io.EOF
+	}
+
+	this.Lock()
+	defer this.Unlock()
 	if this.current == nil {
-		this.fill()
+		this.Fill()
 		if this.eof {
 			return 0, io.EOF
 		}
@@ -46,3 +54,13 @@ func (this *ChanReader) Read(b []byte) (int, error) {
 	}
 	return nBytes, nil
 }
+
+func (this *ChanReader) Release() {
+	this.Lock()
+	defer this.Unlock()
+
+	this.eof = true
+	this.current.Release()
+	this.current = nil
+	this.stream = nil
+}

+ 1 - 1
proxy/http/server.go

@@ -248,7 +248,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
 	finish.Add(1)
 	go func() {
 		defer finish.Done()
-		responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput()))
+		responseReader := bufio.NewReader(v2io.NewChanReader(ray.InboundOutput()))
 		response, err := http.ReadResponse(responseReader, request)
 		if err != nil {
 			log.Warning("HTTP: Failed to read response: ", err)