Przeglądaj źródła

Add size parameter to transport.ReadFrom

V2Ray 10 lat temu
rodzic
commit
e7daa4c21c

+ 4 - 7
common/net/transport.go

@@ -4,12 +4,8 @@ import (
 	"io"
 )
 
-const (
-	bufferSize = 4 * 1024
-)
-
-func ReadFrom(reader io.Reader) ([]byte, error) {
-	buffer := make([]byte, bufferSize)
+func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) {
+	buffer := make([]byte, sizeInKilo<<10)
 	nBytes, err := reader.Read(buffer)
 	if nBytes == 0 {
 		return nil, err
@@ -19,8 +15,9 @@ func ReadFrom(reader io.Reader) ([]byte, error) {
 
 // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
 func ReaderToChan(stream chan<- []byte, reader io.Reader) error {
+	bufferSizeKilo := 4
 	for {
-		data, err := ReadFrom(reader)
+		data, err := ReadFrom(reader, bufferSizeKilo)
 		if len(data) > 0 {
 			stream <- data
 		}

+ 82 - 29
common/net/transport_test.go

@@ -22,7 +22,7 @@ func TestReaderAndWrite(t *testing.T) {
 	readerBuffer := bytes.NewReader(buffer)
 	writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
 
-	transportChan := make(chan []byte, size/bufferSize*10)
+	transportChan := make(chan []byte, 1024)
 
 	err = ReaderToChan(transportChan, readerBuffer)
 	assert.Error(err).Equals(io.EOF)
@@ -52,43 +52,96 @@ func (reader *StaticReader) Read(b []byte) (size int, err error) {
 	return
 }
 
-func BenchmarkTransport(b *testing.B) {
-	size := 1024 * 1024
+func BenchmarkTransport1K(b *testing.B) {
+	size := 1 * 1024
 
 	for i := 0; i < b.N; i++ {
-		transportChanA := make(chan []byte, 128)
-		transportChanB := make(chan []byte, 128)
+		runBenchmarkTransport(size)
+	}
+}
 
-		readerA := &StaticReader{size, 0}
-		readerB := &StaticReader{size, 0}
+func BenchmarkTransport2K(b *testing.B) {
+	size := 2 * 1024
 
-		writerA := ioutil.Discard
-		writerB := ioutil.Discard
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
+	}
+}
 
-		finishA := make(chan bool)
-		finishB := make(chan bool)
+func BenchmarkTransport4K(b *testing.B) {
+	size := 4 * 1024
 
-		go func() {
-			ChanToWriter(writerA, transportChanA)
-			close(finishA)
-		}()
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
+	}
+}
 
-		go func() {
-			ReaderToChan(transportChanA, readerA)
-			close(transportChanA)
-		}()
+func BenchmarkTransport10K(b *testing.B) {
+	size := 10 * 1024
 
-		go func() {
-			ChanToWriter(writerB, transportChanB)
-			close(finishB)
-		}()
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
+	}
+}
 
-		go func() {
-			ReaderToChan(transportChanB, readerB)
-			close(transportChanB)
-		}()
+func BenchmarkTransport100K(b *testing.B) {
+	size := 100 * 1024
 
-		<-transportChanA
-		<-transportChanB
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
 	}
 }
+
+func BenchmarkTransport1M(b *testing.B) {
+	size := 1024 * 1024
+
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
+	}
+}
+
+func BenchmarkTransport10M(b *testing.B) {
+	size := 10 * 1024 * 1024
+
+	for i := 0; i < b.N; i++ {
+		runBenchmarkTransport(size)
+	}
+}
+
+func runBenchmarkTransport(size int) {
+
+	transportChanA := make(chan []byte, 128)
+	transportChanB := make(chan []byte, 128)
+
+	readerA := &StaticReader{size, 0}
+	readerB := &StaticReader{size, 0}
+
+	writerA := ioutil.Discard
+	writerB := ioutil.Discard
+
+	finishA := make(chan bool)
+	finishB := make(chan bool)
+
+	go func() {
+		ChanToWriter(writerA, transportChanA)
+		close(finishA)
+	}()
+
+	go func() {
+		ReaderToChan(transportChanA, readerA)
+		close(transportChanA)
+	}()
+
+	go func() {
+		ChanToWriter(writerB, transportChanB)
+		close(finishB)
+	}()
+
+	go func() {
+		ReaderToChan(transportChanB, readerB)
+		close(transportChanB)
+	}()
+
+	<-transportChanA
+	<-transportChanB
+}

+ 5 - 1
proxy/freedom/freedom.go

@@ -65,7 +65,11 @@ func dumpOutput(conn net.Conn, output chan<- []byte, finish *sync.Mutex, udp boo
 	defer finish.Unlock()
 	defer close(output)
 
-	response, err := v2net.ReadFrom(conn)
+	bufferSize := 4 /* KB */
+	if udp {
+		bufferSize = 2
+	}
+	response, err := v2net.ReadFrom(conn, bufferSize)
 	log.Info("Freedom receives %d bytes from %s", len(response), conn.RemoteAddr().String())
 	if len(response) > 0 {
 		output <- response

+ 4 - 4
proxy/socks/socks.go

@@ -158,7 +158,7 @@ func (server *SocksServer) handleSocks5(reader *v2net.TimeOutReader, writer io.W
 	}
 
 	dest := request.Destination()
-	data, err := v2net.ReadFrom(reader)
+	data, err := v2net.ReadFrom(reader, 4)
 	if err != nil {
 		return err
 	}
@@ -192,8 +192,8 @@ func (server *SocksServer) handleUDP(reader *v2net.TimeOutReader, writer io.Writ
 		return err
 	}
 
-	reader.SetTimeOut(300) /* 5 minutes */
-	v2net.ReadFrom(reader) // Just in case of anything left in the socket
+	reader.SetTimeOut(300)    /* 5 minutes */
+	v2net.ReadFrom(reader, 1) // Just in case of anything left in the socket
 	// The TCP connection closes after this method returns. We need to wait until
 	// the client closes it.
 	// TODO: get notified from UDP part
@@ -215,7 +215,7 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth
 	}
 
 	dest := v2net.NewTCPDestination(v2net.IPAddress(auth.IP[:], auth.Port))
-	data, err := v2net.ReadFrom(reader)
+	data, err := v2net.ReadFrom(reader, 4)
 	if err != nil {
 		return err
 	}

+ 1 - 1
proxy/vmess/vmessout.go

@@ -173,7 +173,7 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<-
 		return
 	}
 
-	buffer, err := v2net.ReadFrom(decryptResponseReader)
+	buffer, err := v2net.ReadFrom(decryptResponseReader, 4)
 	if err != nil {
 		log.Error("VMessOut: Failed to read VMess response (%d bytes): %v", len(buffer), err)
 		return