Darien Raymond vor 8 Jahren
Ursprung
Commit
cb0e29ccdb
4 geänderte Dateien mit 54 neuen und 7 gelöschten Zeilen
  1. 4 3
      common/buf/buffer.go
  2. 7 0
      common/buf/io.go
  3. 42 0
      common/buf/merge_reader.go
  4. 1 4
      transport/ray/direct.go

+ 4 - 3
common/buf/buffer.go

@@ -41,14 +41,15 @@ func (b *Buffer) Clear() {
 }
 
 // AppendBytes appends one or more bytes to the end of the buffer.
-func (b *Buffer) AppendBytes(bytes ...byte) {
-	b.Append(bytes)
+func (b *Buffer) AppendBytes(bytes ...byte) int {
+	return b.Append(bytes)
 }
 
 // Append appends a byte array to the end of the buffer.
-func (b *Buffer) Append(data []byte) {
+func (b *Buffer) Append(data []byte) int {
 	nBytes := copy(b.v[b.end:], data)
 	b.end += nBytes
+	return nBytes
 }
 
 // AppendSupplier appends the content of a BytesWriter to the buffer.

+ 7 - 0
common/buf/io.go

@@ -2,6 +2,7 @@ package buf
 
 import (
 	"io"
+	"time"
 
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/signal"
@@ -13,6 +14,12 @@ type Reader interface {
 	Read() (*Buffer, error)
 }
 
+var ErrReadTimeout = errors.New("Buf: IO timeout.")
+
+type TimeoutReader interface {
+	ReadTimeout(time.Duration) (*Buffer, error)
+}
+
 // Writer extends io.Writer with alloc.Buffer.
 type Writer interface {
 	// Write writes an alloc.Buffer into underlying writer.

+ 42 - 0
common/buf/merge_reader.go

@@ -0,0 +1,42 @@
+package buf
+
+type MergingReader struct {
+	reader   Reader
+	leftover *Buffer
+}
+
+func NewMergingReader(reader Reader) Reader {
+	return &MergingReader{
+		reader: reader,
+	}
+}
+
+func (r *MergingReader) Read() (*Buffer, error) {
+	if r.leftover != nil {
+		return r.leftover, nil
+	}
+
+	b, err := r.reader.Read()
+	if err != nil {
+		return nil, err
+	}
+
+	if b.IsFull() {
+		return b, nil
+	}
+
+	b2, err := r.reader.Read()
+	if err != nil {
+		return b, nil
+	}
+
+	nBytes := b.Append(b2.Bytes())
+	b2.SliceFrom(nBytes)
+	if b2.IsEmpty() {
+		b2.Release()
+	} else {
+		r.leftover = b2
+	}
+
+	return b, nil
+}

+ 1 - 4
transport/ray/direct.go

@@ -2,7 +2,6 @@ package ray
 
 import (
 	"context"
-	"errors"
 	"io"
 	"time"
 
@@ -13,8 +12,6 @@ const (
 	bufferSize = 512
 )
 
-var ErrReadTimeout = errors.New("Ray: timeout.")
-
 // NewRay creates a new Ray for direct traffic transport.
 func NewRay(ctx context.Context) Ray {
 	return &directRay{
@@ -101,7 +98,7 @@ func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) {
 		case <-v.err:
 			return nil, io.ErrClosedPipe
 		case <-time.After(timeout):
-			return nil, ErrReadTimeout
+			return nil, buf.ErrReadTimeout
 		}
 	}
 }