瀏覽代碼

multi reader

Darien Raymond 8 年之前
父節點
當前提交
abe790181e
共有 5 個文件被更改,包括 61 次插入5 次删除
  1. 11 5
      app/proxyman/outbound/handler.go
  2. 6 0
      common/buf/io.go
  3. 4 0
      common/buf/multi_buffer.go
  4. 23 0
      common/buf/reader.go
  5. 17 0
      common/crypto/auth.go

+ 11 - 5
app/proxyman/outbound/handler.go

@@ -128,8 +128,9 @@ type Connection struct {
 	localAddr  net.Addr
 	remoteAddr net.Addr
 
-	reader io.Reader
-	writer buf.Writer
+	bytesReader io.Reader
+	reader      buf.Reader
+	writer      buf.Writer
 }
 
 func NewConnection(stream ray.Ray) *Connection {
@@ -143,8 +144,9 @@ func NewConnection(stream ray.Ray) *Connection {
 			IP:   []byte{0, 0, 0, 0},
 			Port: 0,
 		},
-		reader: buf.ToBytesReader(stream.InboundOutput()),
-		writer: stream.InboundInput(),
+		bytesReader: buf.ToBytesReader(stream.InboundOutput()),
+		reader:      stream.InboundOutput(),
+		writer:      stream.InboundInput(),
 	}
 }
 
@@ -153,7 +155,11 @@ func (v *Connection) Read(b []byte) (int, error) {
 	if v.closed {
 		return 0, io.EOF
 	}
-	return v.reader.Read(b)
+	return v.bytesReader.Read(b)
+}
+
+func (v *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) {
+	return v.reader.Read()
 }
 
 // Write implements net.Conn.Write().

+ 6 - 0
common/buf/io.go

@@ -76,6 +76,12 @@ func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) erro
 // NewReader creates a new Reader.
 // The Reader instance doesn't take the ownership of reader.
 func NewReader(reader io.Reader) Reader {
+	if mr, ok := reader.(MultiBufferReader); ok {
+		return &readerAdpater{
+			MultiBufferReader: mr,
+		}
+	}
+
 	return &BytesToBufferReader{
 		reader: reader,
 		buffer: NewLocal(32 * 1024),

+ 4 - 0
common/buf/multi_buffer.go

@@ -6,6 +6,10 @@ type MultiBufferWriter interface {
 	WriteMultiBuffer(MultiBuffer) (int, error)
 }
 
+type MultiBufferReader interface {
+	ReadMultiBuffer() (MultiBuffer, error)
+}
+
 type MultiBuffer []*Buffer
 
 func NewMultiBuffer() MultiBuffer {

+ 23 - 0
common/buf/reader.go

@@ -23,6 +23,14 @@ func (v *BytesToBufferReader) Read() (MultiBuffer, error) {
 	return mb, nil
 }
 
+type readerAdpater struct {
+	MultiBufferReader
+}
+
+func (r *readerAdpater) Read() (MultiBuffer, error) {
+	return r.ReadMultiBuffer()
+}
+
 type bufferToBytesReader struct {
 	stream  Reader
 	current MultiBuffer
@@ -57,3 +65,18 @@ func (v *bufferToBytesReader) Read(b []byte) (int, error) {
 	}
 	return nBytes, err
 }
+
+func (v *bufferToBytesReader) ReadMultiBuffer() (MultiBuffer, error) {
+	if v.err != nil {
+		return nil, v.err
+	}
+	if v.current == nil {
+		v.fill()
+		if v.err != nil {
+			return nil, v.err
+		}
+	}
+	b := v.current
+	v.current = nil
+	return b, nil
+}

+ 17 - 0
common/crypto/auth.go

@@ -199,6 +199,23 @@ func (v *AuthenticationReader) Read(b []byte) (int, error) {
 	return v.copyChunk(b), nil
 }
 
+func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
+	err := r.ensureChunk()
+	if err != nil {
+		return nil, err
+	}
+
+	mb := buf.NewMultiBuffer()
+	for len(r.chunk) > 0 {
+		b := buf.New()
+		nBytes, _ := b.Write(r.chunk)
+		mb.Append(b)
+		r.chunk = r.chunk[nBytes:]
+	}
+	r.chunk = nil
+	return mb, nil
+}
+
 type AuthenticationWriter struct {
 	auth     Authenticator
 	buffer   []byte