Browse Source

reuse buffers in transport, benchmark shows it improves performance by 10 times

V2Ray 10 years ago
parent
commit
a766c61dcc
2 changed files with 117 additions and 3 deletions
  1. 23 3
      net/transport.go
  2. 94 0
      net/transport_test.go

+ 23 - 3
net/transport.go

@@ -2,17 +2,36 @@ package net
 
 import (
 	"io"
-
-	_ "github.com/v2ray/v2ray-core/log"
 )
 
 const (
 	bufferSize = 32 * 1024
 )
 
+var (
+	dirtyBuffers = make(chan []byte, 1024)
+)
+
+func getBuffer() []byte {
+	var buffer []byte
+	select {
+	case buffer = <-dirtyBuffers:
+	default:
+		buffer = make([]byte, bufferSize)
+	}
+	return buffer
+}
+
+func putBuffer(buffer []byte) {
+	select {
+	case dirtyBuffers <- buffer:
+	default:
+	}
+}
+
 func ReaderToChan(stream chan<- []byte, reader io.Reader) error {
 	for {
-		buffer := make([]byte, bufferSize)
+		buffer := getBuffer()
 		nBytes, err := reader.Read(buffer)
 		if nBytes > 0 {
 			stream <- buffer[:nBytes]
@@ -27,6 +46,7 @@ func ReaderToChan(stream chan<- []byte, reader io.Reader) error {
 func ChanToWriter(writer io.Writer, stream <-chan []byte) error {
 	for buffer := range stream {
 		_, err := writer.Write(buffer)
+		putBuffer(buffer)
 		if err != nil {
 			return err
 		}

+ 94 - 0
net/transport_test.go

@@ -0,0 +1,94 @@
+package net
+
+import (
+	"bytes"
+	"crypto/rand"
+	"io"
+	"io/ioutil"
+	"testing"
+
+	"github.com/v2ray/v2ray-core/testing/unit"
+)
+
+func TestReaderAndWrite(t *testing.T) {
+	assert := unit.Assert(t)
+
+	size := 1024 * 1024
+	buffer := make([]byte, size)
+	nBytes, err := rand.Read(buffer)
+	assert.Int(nBytes).Equals(len(buffer))
+	assert.Error(err).IsNil()
+
+	readerBuffer := bytes.NewReader(buffer)
+	writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
+
+	transportChan := make(chan []byte, size/bufferSize*10)
+
+	err = ReaderToChan(transportChan, readerBuffer)
+	assert.Error(err).Equals(io.EOF)
+	close(transportChan)
+
+	err = ChanToWriter(writerBuffer, transportChan)
+	assert.Error(err).IsNil()
+
+	assert.Bytes(buffer).Equals(writerBuffer.Bytes())
+}
+
+type StaticReader struct {
+	total   int
+	current int
+}
+
+func (reader *StaticReader) Read(b []byte) (size int, err error) {
+	size = len(b)
+	if size > reader.total-reader.current {
+		size = reader.total - reader.current
+	}
+	//rand.Read(b[:size])
+	reader.current += size
+	if reader.current == reader.total {
+		err = io.EOF
+	}
+	return
+}
+
+func BenchmarkTransport(b *testing.B) {
+	size := 1024 * 1024
+
+	for i := 0; i < b.N; i++ {
+		transportChanA := make(chan []byte, 128)
+		transportChanB := make(chan []byte, 128)
+
+		readerA := &StaticReader{size, 0}
+		readerB := &StaticReader{size, 0}
+
+		writerA := ioutil.Discard
+		writerB := ioutil.Discard
+
+		finishA := make(chan bool)
+		finishB := make(chan bool)
+
+		go func() {
+			ChanToWriter(writerA, transportChanA)
+			close(finishA)
+		}()
+
+		go func() {
+			ReaderToChan(transportChanA, readerA)
+			close(transportChanA)
+		}()
+
+		go func() {
+			ChanToWriter(writerB, transportChanB)
+			close(finishB)
+		}()
+
+		go func() {
+			ReaderToChan(transportChanB, readerB)
+			close(transportChanB)
+		}()
+
+		<-transportChanA
+		<-transportChanB
+	}
+}