Bläddra i källkod

move transport methods from net to io

Darien Raymond 9 år sedan
förälder
incheckning
008c285324

+ 7 - 0
common/alloc/buffer.go

@@ -11,6 +11,13 @@ func Release(buffer *Buffer) {
 	}
 }
 
+func Len(buffer *Buffer) int {
+	if buffer == nil {
+		return 0
+	}
+	return buffer.Len()
+}
+
 // Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles
 // the buffer into an internal buffer pool, in order to recreate a buffer more
 // quickly.

+ 1 - 1
common/crypto/authenticator.go

@@ -1,6 +1,6 @@
 package crypto
 
 type Authenticator interface {
-	AuthBytes() int
+	AuthSize() int
 	Authenticate(auth []byte, data []byte) []byte
 }

+ 122 - 0
common/io/reader.go

@@ -0,0 +1,122 @@
+package io // import "github.com/v2ray/v2ray-core/common/io"
+
+import (
+	"io"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+	"github.com/v2ray/v2ray-core/common/crypto"
+	"github.com/v2ray/v2ray-core/common/serial"
+	"github.com/v2ray/v2ray-core/transport"
+)
+
+// ReadFrom reads from a reader and put all content to a buffer.
+// If buffer is nil, ReadFrom creates a new normal buffer.
+func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
+	if buffer == nil {
+		buffer = alloc.NewBuffer()
+	}
+	nBytes, err := reader.Read(buffer.Value)
+	buffer.Slice(0, nBytes)
+	return buffer, err
+}
+
+type Reader interface {
+	Read() (*alloc.Buffer, error)
+}
+
+type AdaptiveReader struct {
+	reader   io.Reader
+	allocate func() *alloc.Buffer
+	isLarge  bool
+}
+
+func NewAdaptiveReader(reader io.Reader) *AdaptiveReader {
+	return &AdaptiveReader{
+		reader:   reader,
+		allocate: alloc.NewBuffer,
+		isLarge:  false,
+	}
+}
+
+func (this *AdaptiveReader) Read() (*alloc.Buffer, error) {
+	buffer, err := ReadFrom(this.reader, this.allocate())
+
+	if buffer.IsFull() && !this.isLarge {
+		this.allocate = alloc.NewLargeBuffer
+		this.isLarge = true
+	} else if !buffer.IsFull() {
+		this.allocate = alloc.NewBuffer
+		this.isLarge = false
+	}
+
+	if err != nil {
+		alloc.Release(buffer)
+		return nil, err
+	}
+	return buffer, nil
+}
+
+type ChunkReader struct {
+	reader io.Reader
+}
+
+func NewChunkReader(reader io.Reader) *ChunkReader {
+	return &ChunkReader{
+		reader: reader,
+	}
+}
+
+func (this *ChunkReader) Read() (*alloc.Buffer, error) {
+	buffer := alloc.NewLargeBuffer()
+	if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil {
+		alloc.Release(buffer)
+		return nil, err
+	}
+	length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
+	if _, err := io.ReadFull(this.reader, buffer.Value[:length]); err != nil {
+		alloc.Release(buffer)
+		return nil, err
+	}
+	buffer.Slice(0, int(length))
+	return buffer, nil
+}
+
+type AuthenticationReader struct {
+	reader            Reader
+	authenticator     crypto.Authenticator
+	authBeforePayload bool
+}
+
+func NewAuthenticationReader(reader io.Reader, auth crypto.Authenticator, authBeforePayload bool) *AuthenticationReader {
+	return &AuthenticationReader{
+		reader:            NewChunkReader(reader),
+		authenticator:     auth,
+		authBeforePayload: authBeforePayload,
+	}
+}
+
+func (this *AuthenticationReader) Read() (*alloc.Buffer, error) {
+	buffer, err := this.reader.Read()
+	if err != nil {
+		alloc.Release(buffer)
+		return nil, err
+	}
+
+	authSize := this.authenticator.AuthSize()
+	var authBytes, payloadBytes []byte
+	if this.authBeforePayload {
+		authBytes = buffer.Value[:authSize]
+		payloadBytes = buffer.Value[authSize:]
+	} else {
+		payloadBytes = buffer.Value[:authSize]
+		authBytes = buffer.Value[authSize:]
+	}
+
+	actualAuthBytes := this.authenticator.Authenticate(nil, payloadBytes)
+	if !serial.BytesLiteral(authBytes).Equals(serial.BytesLiteral(actualAuthBytes)) {
+		alloc.Release(buffer)
+		return nil, transport.CorruptedPacket
+	}
+	buffer.Value = payloadBytes
+	return buffer, nil
+}

+ 42 - 0
common/io/transport.go

@@ -0,0 +1,42 @@
+package io
+
+import (
+	"io"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+)
+
+func RawReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
+	return ReaderToChan(stream, NewAdaptiveReader(reader))
+}
+
+// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
+func ReaderToChan(stream chan<- *alloc.Buffer, reader Reader) error {
+	for {
+		buffer, err := reader.Read()
+		if alloc.Len(buffer) > 0 {
+			stream <- buffer
+		} else {
+			alloc.Release(buffer)
+		}
+
+		if err != nil {
+			return err
+		}
+	}
+}
+
+// ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
+func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
+	for buffer := range stream {
+		nBytes, err := writer.Write(buffer.Value)
+		if nBytes < buffer.Len() {
+			_, err = writer.Write(buffer.Value[nBytes:])
+		}
+		buffer.Release()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 37 - 0
common/io/transport_test.go

@@ -0,0 +1,37 @@
+package io_test
+
+import (
+	"bytes"
+	"crypto/rand"
+	"io"
+	"testing"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+	. "github.com/v2ray/v2ray-core/common/io"
+	v2testing "github.com/v2ray/v2ray-core/testing"
+	"github.com/v2ray/v2ray-core/testing/assert"
+)
+
+func TestReaderAndWrite(t *testing.T) {
+	v2testing.Current(t)
+
+	size := 1024 * 1024
+	buffer := make([]byte, size)
+	nBytes, err := rand.Read(buffer)
+	assert.Int(nBytes).Equals(len(buffer))
+	assert.Error(err).IsNil()
+
+	readerBuffer := bytes.NewReader(buffer)
+	writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
+
+	transportChan := make(chan *alloc.Buffer, 1024)
+
+	err = ReaderToChan(transportChan, NewAdaptiveReader(readerBuffer))
+	assert.Error(err).Equals(io.EOF)
+	close(transportChan)
+
+	err = ChanToWriter(writerBuffer, transportChan)
+	assert.Error(err).IsNil()
+
+	assert.Bytes(buffer).Equals(writerBuffer.Bytes())
+}

+ 0 - 96
common/net/transport.go

@@ -1,96 +0,0 @@
-package net
-
-import (
-	"io"
-
-	"github.com/v2ray/v2ray-core/common/alloc"
-	"github.com/v2ray/v2ray-core/common/crypto"
-	"github.com/v2ray/v2ray-core/common/serial"
-	"github.com/v2ray/v2ray-core/transport"
-)
-
-// ReadFrom reads from a reader and put all content to a buffer.
-// If buffer is nil, ReadFrom creates a new normal buffer.
-func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
-	if buffer == nil {
-		buffer = alloc.NewBuffer()
-	}
-	nBytes, err := reader.Read(buffer.Value)
-	buffer.Slice(0, nBytes)
-	return buffer, err
-}
-
-func ReadChunk(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
-	if buffer == nil {
-		buffer = alloc.NewBuffer()
-	}
-	if _, err := io.ReadFull(reader, buffer.Value[:2]); err != nil {
-		alloc.Release(buffer)
-		return nil, err
-	}
-	length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
-	if _, err := io.ReadFull(reader, buffer.Value[:length]); err != nil {
-		alloc.Release(buffer)
-		return nil, err
-	}
-	buffer.Slice(0, int(length))
-	return buffer, nil
-}
-
-func ReadAuthenticatedChunk(reader io.Reader, auth crypto.Authenticator, buffer *alloc.Buffer) (*alloc.Buffer, error) {
-	buffer, err := ReadChunk(reader, buffer)
-	if err != nil {
-		alloc.Release(buffer)
-		return nil, err
-	}
-	authSize := auth.AuthBytes()
-
-	authBytes := auth.Authenticate(nil, buffer.Value[authSize:])
-
-	if !serial.BytesLiteral(authBytes).Equals(serial.BytesLiteral(buffer.Value[:authSize])) {
-		alloc.Release(buffer)
-		return nil, transport.CorruptedPacket
-	}
-	buffer.SliceFrom(authSize)
-
-	return buffer, nil
-}
-
-// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
-func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
-	allocate := alloc.NewBuffer
-	large := false
-	for {
-		buffer, err := ReadFrom(reader, allocate())
-		if buffer.Len() > 0 {
-			stream <- buffer
-		} else {
-			buffer.Release()
-		}
-		if err != nil {
-			return err
-		}
-		if buffer.IsFull() && !large {
-			allocate = alloc.NewLargeBuffer
-			large = true
-		} else if !buffer.IsFull() {
-			allocate = alloc.NewBuffer
-			large = false
-		}
-	}
-}
-
-// ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
-func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
-	for buffer := range stream {
-		nBytes, err := writer.Write(buffer.Value)
-		if nBytes < buffer.Len() {
-			_, err = writer.Write(buffer.Value[nBytes:])
-		}
-		buffer.Release()
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}

+ 0 - 153
common/net/transport_test.go

@@ -1,153 +0,0 @@
-package net_test
-
-import (
-	"bytes"
-	"crypto/rand"
-	"io"
-	"io/ioutil"
-	"testing"
-
-	"github.com/v2ray/v2ray-core/common/alloc"
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	v2testing "github.com/v2ray/v2ray-core/testing"
-	"github.com/v2ray/v2ray-core/testing/assert"
-)
-
-func TestReaderAndWrite(t *testing.T) {
-	v2testing.Current(t)
-
-	size := 1024 * 1024
-	buffer := make([]byte, size)
-	nBytes, err := rand.Read(buffer)
-	assert.Int(nBytes).Equals(len(buffer))
-	assert.Error(err).IsNil()
-
-	readerBuffer := bytes.NewReader(buffer)
-	writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
-
-	transportChan := make(chan *alloc.Buffer, 1024)
-
-	err = v2net.ReaderToChan(transportChan, readerBuffer)
-	assert.Error(err).Equals(io.EOF)
-	close(transportChan)
-
-	err = v2net.ChanToWriter(writerBuffer, transportChan)
-	assert.Error(err).IsNil()
-
-	assert.Bytes(buffer).Equals(writerBuffer.Bytes())
-}
-
-type StaticReader struct {
-	total   int
-	current int
-}
-
-func (reader *StaticReader) Read(b []byte) (size int, err error) {
-	size = len(b)
-	if size > reader.total-reader.current {
-		size = reader.total - reader.current
-	}
-	for i := 0; i < size; i++ {
-		b[i] = byte(i)
-	}
-	//rand.Read(b[:size])
-	reader.current += size
-	if reader.current == reader.total {
-		err = io.EOF
-	}
-	return
-}
-
-func BenchmarkTransport1K(b *testing.B) {
-	size := 1 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport2K(b *testing.B) {
-	size := 2 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport4K(b *testing.B) {
-	size := 4 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport10K(b *testing.B) {
-	size := 10 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport100K(b *testing.B) {
-	size := 100 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport1M(b *testing.B) {
-	size := 1024 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func BenchmarkTransport10M(b *testing.B) {
-	size := 10 * 1024 * 1024
-
-	for i := 0; i < b.N; i++ {
-		runBenchmarkTransport(size)
-	}
-}
-
-func runBenchmarkTransport(size int) {
-
-	transportChanA := make(chan *alloc.Buffer, 16)
-	transportChanB := make(chan *alloc.Buffer, 16)
-
-	readerA := &StaticReader{size, 0}
-	readerB := &StaticReader{size, 0}
-
-	writerA := ioutil.Discard
-	writerB := ioutil.Discard
-
-	finishA := make(chan bool)
-	finishB := make(chan bool)
-
-	go func() {
-		v2net.ChanToWriter(writerA, transportChanA)
-		close(finishA)
-	}()
-
-	go func() {
-		v2net.ReaderToChan(transportChanA, readerA)
-		close(transportChanA)
-	}()
-
-	go func() {
-		v2net.ChanToWriter(writerB, transportChanB)
-		close(finishB)
-	}()
-
-	go func() {
-		v2net.ReaderToChan(transportChanB, readerB)
-		close(transportChanB)
-	}()
-
-	<-transportChanA
-	<-transportChanB
-}

+ 2 - 1
proxy/blackhole/blackhole.go

@@ -4,6 +4,7 @@ import (
 	"io/ioutil"
 
 	"github.com/v2ray/v2ray-core/app"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
@@ -25,7 +26,7 @@ func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) e
 
 	close(ray.OutboundOutput())
 	if firstPacket.MoreChunks() {
-		v2net.ChanToWriter(ioutil.Discard, ray.OutboundInput())
+		v2io.ChanToWriter(ioutil.Discard, ray.OutboundInput())
 	}
 	return nil
 }

+ 3 - 2
proxy/dokodemo/dokodemo.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
@@ -140,12 +141,12 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
 }
 
 func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
-	v2net.ReaderToChan(input, reader)
+	v2io.RawReaderToChan(input, reader)
 	finish.Unlock()
 	close(input)
 }
 
 func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
-	v2net.ChanToWriter(writer, output)
+	v2io.ChanToWriter(writer, output)
 	finish.Unlock()
 }

+ 4 - 3
proxy/freedom/freedom.go

@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/common/retry"
@@ -50,7 +51,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
 		writeMutex.Unlock()
 	} else {
 		go func() {
-			v2net.ChanToWriter(conn, input)
+			v2io.ChanToWriter(conn, input)
 			writeMutex.Unlock()
 		}()
 	}
@@ -59,7 +60,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
 		defer readMutex.Unlock()
 		defer close(output)
 
-		response, err := v2net.ReadFrom(conn, nil)
+		response, err := v2io.ReadFrom(conn, nil)
 		log.Info("Freedom receives ", response.Len(), " bytes from ", conn.RemoteAddr())
 		if response.Len() > 0 {
 			output <- response
@@ -73,7 +74,7 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
 			return
 		}
 
-		v2net.ReaderToChan(output, conn)
+		v2io.RawReaderToChan(output, conn)
 	}()
 
 	if this.space.HasDnsCache() {

+ 2 - 1
proxy/freedom/freedom_test.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
 	v2proxy "github.com/v2ray/v2ray-core/proxy"
@@ -128,7 +129,7 @@ func TestSocksTcpConnect(t *testing.T) {
 		tcpConn.CloseWrite()
 	}
 
-	dataReturned, err := v2net.ReadFrom(conn, nil)
+	dataReturned, err := v2io.ReadFrom(conn, nil)
 	assert.Error(err).IsNil()
 	conn.Close()
 

+ 3 - 2
proxy/http/http.go

@@ -11,6 +11,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/common/serial"
@@ -153,13 +154,13 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra
 	defer wg.Wait()
 
 	go func() {
-		v2net.ReaderToChan(ray.InboundInput(), input)
+		v2io.RawReaderToChan(ray.InboundInput(), input)
 		close(ray.InboundInput())
 		wg.Done()
 	}()
 
 	go func() {
-		v2net.ChanToWriter(output, ray.InboundOutput())
+		v2io.ChanToWriter(output, ray.InboundOutput())
 		wg.Done()
 	}()
 }

+ 4 - 3
proxy/shadowsocks/shadowsocks.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
@@ -84,7 +85,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, dest v2net.Des
 		return
 	}
 
-	buffer, _ := v2net.ReadFrom(reader, nil)
+	buffer, _ := v2io.ReadFrom(reader, nil)
 
 	packet := v2net.NewPacket(v2net.TCPDestination(request.Address, request.Port), buffer, false)
 	ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
@@ -168,12 +169,12 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
 			payload.Release()
 
 			writer.Write(firstChunk.Value)
-			v2net.ChanToWriter(writer, ray.InboundOutput())
+			v2io.ChanToWriter(writer, ray.InboundOutput())
 		}
 		writeFinish.Unlock()
 	}()
 
-	v2net.ReaderToChan(ray.InboundInput(), reader)
+	v2io.RawReaderToChan(ray.InboundInput(), reader)
 	close(ray.InboundInput())
 
 	writeFinish.Lock()

+ 5 - 4
proxy/socks/socks.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
@@ -227,8 +228,8 @@ func (this *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writer
 		return err
 	}
 
-	reader.SetTimeOut(300)      /* 5 minutes */
-	v2net.ReadFrom(reader, nil) // Just in case of anything left in the socket
+	reader.SetTimeOut(300)     /* 5 minutes */
+	v2io.ReadFrom(reader, nil) // Just in case of anything left in the socket
 	// The TCP connection closes after this method returns. We need to wait until
 	// the client closes it.
 	// TODO: get notified from UDP part
@@ -270,13 +271,13 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack
 	outputFinish.Lock()
 
 	go func() {
-		v2net.ReaderToChan(input, reader)
+		v2io.RawReaderToChan(input, reader)
 		inputFinish.Unlock()
 		close(input)
 	}()
 
 	go func() {
-		v2net.ChanToWriter(writer, output)
+		v2io.ChanToWriter(writer, output)
 		outputFinish.Unlock()
 	}()
 	outputFinish.Lock()

+ 3 - 2
proxy/testing/mocks/inboundhandler.go

@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 )
 
@@ -41,13 +42,13 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
 	writeFinish.Lock()
 
 	go func() {
-		v2net.ReaderToChan(input, this.ConnInput)
+		v2io.RawReaderToChan(input, this.ConnInput)
 		close(input)
 		readFinish.Unlock()
 	}()
 
 	go func() {
-		v2net.ChanToWriter(this.ConnOutput, output)
+		v2io.ChanToWriter(this.ConnOutput, output)
 		writeFinish.Unlock()
 	}()
 

+ 3 - 2
proxy/testing/mocks/outboundhandler.go

@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/transport/ray"
@@ -32,14 +33,14 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out
 		writeFinish.Lock()
 
 		go func() {
-			v2net.ChanToWriter(this.ConnOutput, input)
+			v2io.ChanToWriter(this.ConnOutput, input)
 			writeFinish.Unlock()
 		}()
 
 		writeFinish.Lock()
 	}
 
-	v2net.ReaderToChan(output, this.ConnInput)
+	v2io.RawReaderToChan(output, this.ConnInput)
 	close(output)
 
 	return nil

+ 3 - 2
proxy/vmess/inbound/inbound.go

@@ -8,6 +8,7 @@ import (
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2crypto "github.com/v2ray/v2ray-core/common/crypto"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/common/serial"
@@ -136,11 +137,11 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<-
 		return
 	}
 	requestReader := v2crypto.NewCryptionReader(aesStream, reader)
-	v2net.ReaderToChan(input, requestReader)
+	v2io.RawReaderToChan(input, requestReader)
 }
 
 func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
-	v2net.ChanToWriter(writer, output)
+	v2io.ChanToWriter(writer, output)
 	finish.Unlock()
 }
 

+ 4 - 3
proxy/vmess/outbound/outbound.go

@@ -11,6 +11,7 @@ import (
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2crypto "github.com/v2ray/v2ray-core/common/crypto"
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
@@ -132,7 +133,7 @@ func (this *VMessOutboundHandler) handleRequest(conn net.Conn, request *protocol
 	}
 
 	if moreChunks {
-		v2net.ChanToWriter(encryptRequestWriter, input)
+		v2io.ChanToWriter(encryptRequestWriter, input)
 	}
 	return
 }
@@ -154,7 +155,7 @@ func (this *VMessOutboundHandler) handleResponse(conn net.Conn, request *protoco
 	}
 	decryptResponseReader := v2crypto.NewCryptionReader(aesStream, conn)
 
-	buffer, err := v2net.ReadFrom(decryptResponseReader, nil)
+	buffer, err := v2io.ReadFrom(decryptResponseReader, nil)
 	if err != nil {
 		log.Error("VMessOut: Failed to read VMess response (", buffer.Len(), " bytes): ", err)
 		buffer.Release()
@@ -184,7 +185,7 @@ func (this *VMessOutboundHandler) handleResponse(conn net.Conn, request *protoco
 	output <- buffer
 
 	if !isUDP {
-		v2net.ReaderToChan(output, decryptResponseReader)
+		v2io.RawReaderToChan(output, decryptResponseReader)
 	}
 
 	return

+ 1 - 1
proxy/vmess/protocol/vmess_test.go

@@ -84,7 +84,7 @@ func TestReadSingleByte(t *testing.T) {
 
 	reader := NewVMessRequestReader(nil)
 	_, err := reader.Read(bytes.NewReader(make([]byte, 1)))
-	assert.Error(err).Equals(io.EOF)
+	assert.Error(err).Equals(io.ErrUnexpectedEOF)
 }
 
 func BenchmarkVMessRequestWriting(b *testing.B) {

+ 2 - 1
testing/servers/tcp/tcp.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"net"
 
+	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 )
 
@@ -43,7 +44,7 @@ func (server *Server) acceptConnections(listener *net.TCPListener) {
 
 func (server *Server) handleConnection(conn net.Conn) {
 	for true {
-		request, err := v2net.ReadFrom(conn, nil)
+		request, err := v2io.ReadFrom(conn, nil)
 		if err != nil {
 			break
 		}

+ 1 - 1
transport/hub/udp.go

@@ -44,7 +44,7 @@ func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error)
 }
 
 func (this *UDPHub) start() {
-  this.accepting = true
+	this.accepting = true
 	for this.accepting {
 		buffer := alloc.NewBuffer()
 		nBytes, addr, err := this.conn.ReadFromUDP(buffer.Value)