瀏覽代碼

merge buf stream

Darien Raymond 8 年之前
父節點
當前提交
f643344154
共有 4 個文件被更改,包括 61 次插入15 次删除
  1. 14 11
      common/buf/merge_reader.go
  2. 33 0
      common/buf/merge_reader_test.go
  3. 5 2
      proxy/shadowsocks/client.go
  4. 9 2
      proxy/vmess/outbound/outbound.go

+ 14 - 11
common/buf/merge_reader.go

@@ -31,17 +31,20 @@ func (r *MergingReader) Read() (*Buffer, error) {
 		return b, nil
 	}
 
-	b2, err := r.timeoutReader.ReadTimeout(0)
-	if err != nil {
-		return b, nil
-	}
-
-	nBytes := b.Append(b2.Bytes())
-	b2.SliceFrom(nBytes)
-	if b2.IsEmpty() {
-		b2.Release()
-	} else {
-		r.leftover = b2
+	for {
+		b2, err := r.timeoutReader.ReadTimeout(0)
+		if err != nil {
+			break
+		}
+
+		nBytes := b.Append(b2.Bytes())
+		b2.SliceFrom(nBytes)
+		if b2.IsEmpty() {
+			b2.Release()
+		} else {
+			r.leftover = b2
+			break
+		}
 	}
 
 	return b, nil

+ 33 - 0
common/buf/merge_reader_test.go

@@ -0,0 +1,33 @@
+package buf_test
+
+import (
+	"testing"
+
+	"context"
+
+	. "v2ray.com/core/common/buf"
+	"v2ray.com/core/testing/assert"
+	"v2ray.com/core/transport/ray"
+)
+
+func TestMergingReader(t *testing.T) {
+	assert := assert.On(t)
+
+	stream := ray.NewStream(context.Background())
+	b1 := New()
+	b1.AppendBytes('a', 'b', 'c')
+	stream.Write(b1)
+
+	b2 := New()
+	b2.AppendBytes('e', 'f', 'g')
+	stream.Write(b2)
+
+	b3 := New()
+	b3.AppendBytes('h', 'i', 'j')
+	stream.Write(b3)
+
+	reader := NewMergingReader(stream)
+	b, err := reader.Read()
+	assert.Error(err).IsNil()
+	assert.String(b.String()).Equals("abcefghij")
+}

+ 5 - 2
proxy/shadowsocks/client.go

@@ -101,10 +101,13 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 			return err
 		}
 
-		bufferedWriter.SetBuffered(false)
+		if err := bufferedWriter.SetBuffered(false); err != nil {
+			return err
+		}
 
 		requestDone := signal.ExecuteAsync(func() error {
-			if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), bodyWriter); err != nil {
+			mergedInput := buf.NewMergingReader(outboundRay.OutboundInput())
+			if err := buf.PipeUntilEOF(timer, mergedInput, bodyWriter); err != nil {
 				return err
 			}
 			return nil

+ 9 - 2
proxy/vmess/outbound/outbound.go

@@ -124,9 +124,16 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 			firstPayload.Release()
 		}
 
-		writer.SetBuffered(false)
+		if err := writer.SetBuffered(false); err != nil {
+			return err
+		}
+
+		var inputReader buf.Reader = input
+		if request.Command == protocol.RequestCommandTCP {
+			inputReader = buf.NewMergingReader(input)
+		}
 
-		if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil {
+		if err := buf.PipeUntilEOF(timer, inputReader, bodyWriter); err != nil {
 			return err
 		}