Browse Source

refine http proxy

v2ray 10 years ago
parent
commit
34f34bb25d
2 changed files with 70 additions and 5 deletions
  1. 47 0
      proxy/http/chan_reader.go
  2. 23 5
      proxy/http/http.go

+ 47 - 0
proxy/http/chan_reader.go

@@ -0,0 +1,47 @@
+package http
+
+import (
+	"io"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+)
+
+type ChanReader struct {
+	stream  <-chan *alloc.Buffer
+	current *alloc.Buffer
+	eof     bool
+}
+
+func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader {
+	this := &ChanReader{
+		stream: stream,
+	}
+	this.fill()
+	return this
+}
+
+func (this *ChanReader) fill() {
+	b, ok := <-this.stream
+	this.current = b
+	if !ok {
+		this.eof = true
+		this.current = nil
+	}
+}
+
+func (this *ChanReader) Read(b []byte) (int, error) {
+	if this.current == nil {
+		this.fill()
+		if this.eof {
+			return 0, io.EOF
+		}
+	}
+	nBytes := copy(b, this.current.Value)
+	if nBytes == this.current.Len() {
+		this.current.Release()
+		this.current = nil
+	} else {
+		this.current.SliceFrom(nBytes)
+	}
+	return nBytes, nil
+}

+ 23 - 5
proxy/http/http.go

@@ -8,6 +8,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
@@ -131,9 +132,21 @@ func (this *HttpProxyServer) handleRequest(request *http.Request, reader io.Read
 		buffer := alloc.NewBuffer().Clear()
 		request.Write(buffer)
 		log.Info("Request to remote: %s", string(buffer.Value))
-		packet := v2net.NewPacket(v2net.NewTCPDestination(address), buffer, false)
+		packet := v2net.NewPacket(v2net.NewTCPDestination(address), buffer, true)
 		ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
-		this.transport(nil, writer, ray)
+		defer close(ray.InboundInput())
+
+		responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput()))
+		response, err := http.ReadResponse(responseReader, request)
+		if err != nil {
+			return
+		}
+
+		responseBuffer := alloc.NewBuffer().Clear()
+		defer responseBuffer.Release()
+		response.Write(responseBuffer)
+		writer.Write(responseBuffer.Value)
+
 	} else {
 		response := &http.Response{
 			Status:        "400 Bad Request",
@@ -155,17 +168,17 @@ func (this *HttpProxyServer) handleRequest(request *http.Request, reader io.Read
 
 func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ray.InboundRay) {
 	var inputFinish, outputFinish sync.Mutex
-	inputFinish.Lock()
 	outputFinish.Lock()
 
 	if input != nil {
+		inputFinish.Lock()
 		go func() {
 			v2net.ReaderToChan(ray.InboundInput(), input)
 			inputFinish.Unlock()
-			close(ray.InboundInput())
 		}()
 	} else {
-		close(ray.InboundInput())
+		// TODO: We can not close write so quickly, as some HTTP server will stop responding if so.
+
 	}
 
 	go func() {
@@ -173,5 +186,10 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra
 		outputFinish.Unlock()
 	}()
 
+	inputFinish.Lock()
+	go func() {
+		<-time.After(10 * time.Second)
+		close(ray.InboundInput())
+	}()
 	outputFinish.Lock()
 }