Browse Source

Allow data stream passing through http proxy

v2ray 9 năm trước cách đây
mục cha
commit
3156c4586c
4 tập tin đã thay đổi với 86 bổ sung14 xóa
  1. 12 0
      common/alloc/buffer.go
  2. 1 0
      common/alloc/buffer_pool.go
  3. 48 0
      common/io/chain_writer.go
  4. 25 14
      proxy/http/server.go

+ 12 - 0
common/alloc/buffer.go

@@ -159,3 +159,15 @@ func NewBuffer() *Buffer {
 func NewLargeBuffer() *Buffer {
 	return largePool.Allocate()
 }
+
+func NewBufferWithSize(size int) *Buffer {
+	if size <= SmallBufferSize {
+		return NewSmallBuffer()
+	}
+
+	if size <= BufferSize {
+		return NewBuffer()
+	}
+
+	return NewLargeBuffer()
+}

+ 1 - 0
common/alloc/buffer_pool.go

@@ -50,6 +50,7 @@ func (p *BufferPool) Free(buffer *Buffer) {
 }
 
 const (
+	SmallBufferSize = 1024 - defaultOffset
 	BufferSize      = 8*1024 - defaultOffset
 	LargeBufferSize = 64*1024 - defaultOffset
 )

+ 48 - 0
common/io/chain_writer.go

@@ -0,0 +1,48 @@
+package io
+
+import (
+	"io"
+	"sync"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+)
+
+type ChainWriter struct {
+	sync.Mutex
+	writer Writer
+}
+
+func NewChainWriter(writer Writer) *ChainWriter {
+	return &ChainWriter{
+		writer: writer,
+	}
+}
+
+func (this *ChainWriter) Write(payload []byte) (int, error) {
+	if this.writer == nil {
+		return 0, io.EOF
+	}
+
+	size := len(payload)
+	buffer := alloc.NewBufferWithSize(size).Clear()
+	buffer.Append(payload)
+
+	this.Lock()
+	defer this.Unlock()
+	if this.writer == nil {
+		return 0, io.EOF
+	}
+
+	err := this.writer.Write(buffer)
+	if err != nil {
+		return 0, err
+	}
+	return size, nil
+}
+
+func (this *ChainWriter) Release() {
+	this.Lock()
+	this.writer.Release()
+	this.writer = nil
+	this.Unlock()
+}

+ 25 - 14
proxy/http/server.go

@@ -228,30 +228,41 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
 	request.Host = request.URL.Host
 	StripHopByHopHeaders(request)
 
-	requestBuffer := alloc.NewBuffer().Clear() // Don't release this buffer as it is passed into a Packet.
-	request.Write(requestBuffer)
-	log.Debug("Request to remote:\n", requestBuffer.Value)
-
 	ray := this.packetDispatcher.DispatchToOutbound(dest)
-	ray.InboundInput().Write(requestBuffer)
 	defer ray.InboundInput().Close()
+	defer ray.InboundOutput().Release()
 
-	var wg sync.WaitGroup
-	wg.Add(1)
+	var finish sync.WaitGroup
+	finish.Add(1)
 	go func() {
-		defer wg.Done()
+		defer finish.Done()
+		requestWriter := v2io.NewBufferedWriter(v2io.NewChainWriter(ray.InboundInput()))
+		err := request.Write(requestWriter)
+		if err != nil {
+			log.Warning("HTTP: Failed to write request: ", err)
+			return
+		}
+		requestWriter.Flush()
+	}()
+
+	finish.Add(1)
+	go func() {
+		defer finish.Done()
 		responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput()))
 		response, err := http.ReadResponse(responseReader, request)
 		if err != nil {
+			log.Warning("HTTP: Failed to read response: ", err)
+			return
+		}
+		responseWriter := v2io.NewBufferedWriter(writer)
+		err = response.Write(responseWriter)
+		if err != nil {
+			log.Warning("HTTP: Failed to write response: ", err)
 			return
 		}
-		responseBuffer := alloc.NewBuffer().Clear()
-		defer responseBuffer.Release()
-		response.Write(responseBuffer)
-		writer.Write(responseBuffer.Value)
-		response.Body.Close()
+		responseWriter.Flush()
 	}()
-	wg.Wait()
+	finish.Wait()
 }
 
 func init() {