فهرست منبع

refine buffer allocation

Darien Raymond 7 سال پیش
والد
کامیت
f97e6fa3d2
7فایلهای تغییر یافته به همراه68 افزوده شده و 70 حذف شده
  1. 1 1
      app/proxyman/mux/reader.go
  2. 41 10
      common/buf/buffer.go
  3. 17 27
      common/buf/buffer_pool.go
  4. 0 27
      common/buf/buffer_test.go
  5. 7 3
      common/buf/reader.go
  6. 1 1
      common/crypto/auth.go
  7. 1 1
      proxy/shadowsocks/ota.go

+ 1 - 1
app/proxyman/mux/reader.go

@@ -55,7 +55,7 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 	if size <= buf.Size {
 		b = buf.New()
 	} else {
-		b = buf.NewLocal(int(size))
+		b = buf.NewLocal(uint32(size))
 	}
 	if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, int(size))); err != nil {
 		b.Release()

+ 41 - 10
common/buf/buffer.go

@@ -12,8 +12,7 @@ type Supplier func([]byte) (int, error)
 // the buffer into an internal buffer pool, in order to recreate a buffer more
 // quickly.
 type Buffer struct {
-	v    []byte
-	pool *Pool
+	v []byte
 
 	start int32
 	end   int32
@@ -24,11 +23,8 @@ func (b *Buffer) Release() {
 	if b == nil || b.v == nil {
 		return
 	}
-	if b.pool != nil {
-		b.pool.Free(b)
-	}
+	FreeBytes(b.v)
 	b.v = nil
-	b.pool = nil
 	b.start = 0
 	b.end = 0
 }
@@ -178,13 +174,48 @@ func (b *Buffer) String() string {
 
 // New creates a Buffer with 0 length and 8K capacity.
 func New() *Buffer {
-	return mediumPool.Allocate()
+	return &Buffer{
+		v: pool2k.Get().([]byte),
+	}
 }
 
 // NewLocal creates and returns a buffer with 0 length and given capacity on current thread.
-func NewLocal(size int) *Buffer {
+func NewLocal(size uint32) *Buffer {
 	return &Buffer{
-		v:    make([]byte, size),
-		pool: nil,
+		v: NewBytes(size),
+	}
+}
+
+func NewBytes(size uint32) []byte {
+	if size > 128*1024 {
+		return make([]byte, size)
+	}
+
+	if size > 64*1024 {
+		return pool128k.Get().([]byte)
+	}
+
+	if size > 8*1024 {
+		return pool64k.Get().([]byte)
+	}
+
+	if size > 2*1024 {
+		return pool8k.Get().([]byte)
+	}
+
+	return pool2k.Get().([]byte)
+}
+
+func FreeBytes(b []byte) {
+	size := len(b)
+	switch {
+	case size >= 128*1024:
+		pool128k.Put(b)
+	case size >= 64*1024:
+		pool64k.Put(b)
+	case size >= 8*1024:
+		pool8k.Put(b)
+	case size >= 2*1024:
+		pool2k.Put(b)
 	}
 }

+ 17 - 27
common/buf/buffer_pool.go

@@ -4,39 +4,29 @@ import (
 	"sync"
 )
 
-// Pool provides functionality to generate and recycle buffers on demand.
-type Pool struct {
-	allocator *sync.Pool
-}
+const (
+	// Size of a regular buffer.
+	Size = 2 * 1024
+)
 
-// NewPool creates a SyncPool with given buffer size.
-func NewPool(bufferSize uint32) *Pool {
-	pool := &Pool{
-		allocator: &sync.Pool{
-			New: func() interface{} { return make([]byte, bufferSize) },
-		},
+func createAllocFunc(size uint32) func() interface{} {
+	return func() interface{} {
+		return make([]byte, size)
 	}
-	return pool
 }
 
-// Allocate either returns a unused buffer from the pool, or generates a new one from system.
-func (p *Pool) Allocate() *Buffer {
-	return &Buffer{
-		v:    p.allocator.Get().([]byte),
-		pool: p,
-	}
+var pool2k = &sync.Pool{
+	New: createAllocFunc(2 * 1024),
 }
 
-// // Free recycles the given buffer.
-func (p *Pool) Free(buffer *Buffer) {
-	if buffer.v != nil {
-		p.allocator.Put(buffer.v)
-	}
+var pool8k = &sync.Pool{
+	New: createAllocFunc(8 * 1024),
 }
 
-const (
-	// Size of a regular buffer.
-	Size = 2 * 1024
-)
+var pool64k = &sync.Pool{
+	New: createAllocFunc(64 * 1024),
+}
 
-var mediumPool = NewPool(Size)
+var pool128k = &sync.Pool{
+	New: createAllocFunc(128 * 1024),
+}

+ 0 - 27
common/buf/buffer_test.go

@@ -1,7 +1,6 @@
 package buf_test
 
 import (
-	"crypto/rand"
 	"testing"
 
 	. "v2ray.com/core/common/buf"
@@ -42,32 +41,6 @@ func TestBufferString(t *testing.T) {
 	assert(buffer.String(), Equals, "Test String")
 }
 
-func TestBufferWrite(t *testing.T) {
-	assert := With(t)
-
-	buffer := NewLocal(8)
-	nBytes, err := buffer.Write([]byte("abcd"))
-	assert(err, IsNil)
-	assert(nBytes, Equals, 4)
-	nBytes, err = buffer.Write([]byte("abcde"))
-	assert(err, IsNil)
-	assert(nBytes, Equals, 4)
-	assert(buffer.String(), Equals, "abcdabcd")
-}
-
-func TestSyncPool(t *testing.T) {
-	assert := With(t)
-
-	p := NewPool(32)
-	b := p.Allocate()
-	assert(b.Len(), Equals, 0)
-
-	assert(b.AppendSupplier(ReadFrom(rand.Reader)), IsNil)
-	assert(b.Len(), Equals, 32)
-
-	b.Release()
-}
-
 func BenchmarkNewBuffer(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 		buffer := New()

+ 7 - 3
common/buf/reader.go

@@ -21,12 +21,13 @@ func NewBytesToBufferReader(reader io.Reader) Reader {
 
 const mediumSize = 8 * 1024
 const largeSize = 64 * 1024
+const xlSize = 128 * 1024
 
 func (r *BytesToBufferReader) readSmall() (MultiBuffer, error) {
 	b := New()
 	err := b.Reset(ReadFrom(r.Reader))
 	if b.IsFull() {
-		r.buffer = make([]byte, mediumSize)
+		r.buffer = NewBytes(mediumSize)
 	}
 	if !b.IsEmpty() {
 		return NewMultiBufferValue(b), nil
@@ -45,11 +46,14 @@ func (r *BytesToBufferReader) ReadMultiBuffer() (MultiBuffer, error) {
 	if nBytes > 0 {
 		mb := NewMultiBufferCap(nBytes/Size + 1)
 		mb.Write(r.buffer[:nBytes])
-		if nBytes == len(r.buffer) && len(r.buffer) == mediumSize {
-			r.buffer = make([]byte, largeSize)
+		if nBytes == len(r.buffer) && nBytes < xlSize {
+			FreeBytes(r.buffer)
+			r.buffer = NewBytes(uint32(nBytes) + 1)
 		}
 		return mb, nil
 	}
+	FreeBytes(r.buffer)
+	r.buffer = nil
 	return nil, err
 }
 

+ 1 - 1
common/crypto/auth.go

@@ -128,7 +128,7 @@ func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 	if size <= buf.Size {
 		b = buf.New()
 	} else {
-		b = buf.NewLocal(size)
+		b = buf.NewLocal(uint32(size))
 	}
 	if err := b.Reset(buf.ReadFullFrom(r.reader, size)); err != nil {
 		b.Release()

+ 1 - 1
proxy/shadowsocks/ota.go

@@ -81,7 +81,7 @@ func (v *ChunkReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 	if length > buf.Size {
 		// Theoretically the size of a chunk is 64K, but most Shadowsocks implementations used <4K buffer.
 		buffer.Release()
-		buffer = buf.NewLocal(int(length) + 128)
+		buffer = buf.NewLocal(uint32(length) + 128)
 	}
 
 	buffer.Clear()