Darien Raymond 7 lat temu
rodzic
commit
a941ef5392
3 zmienionych plików z 128 dodań i 3 usunięć
  1. 31 0
      common/buf/io.go
  2. 91 0
      common/buf/readv_reader.go
  3. 6 3
      testing/scenarios/vmess_test.go

+ 31 - 0
common/buf/io.go

@@ -2,7 +2,11 @@ package buf
 
 import (
 	"io"
+	"runtime"
+	"syscall"
 	"time"
+
+	"v2ray.com/core/common/platform"
 )
 
 // Reader extends io.Reader with MultiBuffer.
@@ -46,6 +50,22 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier {
 	}
 }
 
+var useReadv = false
+
+func init() {
+	const defaultFlagValue = "NOT_DEFINED_AT_ALL"
+	value := platform.NewEnvFlag("v2ray.buf.readv.disable").GetValue(func() string { return defaultFlagValue })
+	if value != defaultFlagValue {
+		useReadv = false
+		return
+	}
+
+	if runtime.GOOS == "linux" || runtime.GOOS == "darwin" {
+		newError("ReadV enabled").WriteToLog()
+		useReadv = true
+	}
+}
+
 // NewReader creates a new Reader.
 // The Reader instance doesn't take the ownership of reader.
 func NewReader(reader io.Reader) Reader {
@@ -53,6 +73,17 @@ func NewReader(reader io.Reader) Reader {
 		return mr
 	}
 
+	if useReadv {
+		if sc, ok := reader.(syscall.Conn); ok {
+			rawConn, err := sc.SyscallConn()
+			if err != nil {
+				newError("failed to get sysconn").Base(err).WriteToLog()
+			} else {
+				return NewReadVReader(reader, rawConn)
+			}
+		}
+	}
+
 	return NewBytesToBufferReader(reader)
 }
 

+ 91 - 0
common/buf/readv_reader.go

@@ -0,0 +1,91 @@
+package buf
+
+import (
+	"io"
+	"syscall"
+	"unsafe"
+)
+
+type ReadVReader struct {
+	io.Reader
+	rawConn syscall.RawConn
+	nBuf    int32
+}
+
+func NewReadVReader(reader io.Reader, rawConn syscall.RawConn) *ReadVReader {
+	return &ReadVReader{
+		Reader:  reader,
+		rawConn: rawConn,
+		nBuf:    1,
+	}
+}
+
+func allocN(n int32) []*Buffer {
+	bs := make([]*Buffer, 0, n)
+	for i := int32(0); i < n; i++ {
+		bs = append(bs, New())
+	}
+	return bs
+}
+
+func (r *ReadVReader) ReadMultiBuffer() (MultiBuffer, error) {
+	bs := allocN(r.nBuf)
+
+	iovecs := make([]syscall.Iovec, r.nBuf)
+	for idx, b := range bs {
+		iovecs[idx] = syscall.Iovec{
+			Base: &(b.v[0]),
+		}
+		iovecs[idx].SetLen(int(Size))
+	}
+
+	var nBytes int
+
+	err := r.rawConn.Read(func(fd uintptr) bool {
+		n, _, e := syscall.Syscall(syscall.SYS_READV, fd, uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
+		if e != 0 {
+			return false
+		}
+		nBytes = int(n)
+		return true
+	})
+
+	if err != nil {
+		mb := MultiBuffer(bs)
+		mb.Release()
+		return nil, err
+	}
+
+	if nBytes == 0 {
+		mb := MultiBuffer(bs)
+		mb.Release()
+		return nil, io.EOF
+	}
+
+	nBuf := 0
+	for nBuf < len(bs) {
+		if nBytes <= 0 {
+			break
+		}
+		end := int32(nBytes)
+		if end > Size {
+			end = Size
+		}
+		bs[nBuf].end = end
+		nBytes -= int(end)
+		nBuf++
+	}
+
+	for i := nBuf; i < len(bs); i++ {
+		bs[i].Release()
+		bs[i] = nil
+	}
+
+	if int32(nBuf) == r.nBuf && nBuf < 128 {
+		r.nBuf *= 4
+	} else {
+		r.nBuf = int32(nBuf)
+	}
+
+	return MultiBuffer(bs[:nBuf]), nil
+}

+ 6 - 3
testing/scenarios/vmess_test.go

@@ -9,6 +9,7 @@ import (
 	"v2ray.com/core"
 	"v2ray.com/core/app/log"
 	"v2ray.com/core/app/proxyman"
+	"v2ray.com/core/common/compare"
 	clog "v2ray.com/core/common/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
@@ -272,6 +273,7 @@ func TestVMessGCM(t *testing.T) {
 				Port: int(clientPort),
 			})
 			assert(err, IsNil)
+			defer conn.Close() // nolint: errcheck
 
 			payload := make([]byte, 10240*1024)
 			rand.Read(payload)
@@ -280,9 +282,10 @@ func TestVMessGCM(t *testing.T) {
 			assert(err, IsNil)
 			assert(nBytes, Equals, len(payload))
 
-			response := readFrom(conn, time.Second*20, 10240*1024)
-			assert(response, Equals, xor([]byte(payload)))
-			assert(conn.Close(), IsNil)
+			response := readFrom(conn, time.Second*40, 10240*1024)
+			if err := compare.BytesEqualWithDetail(response, xor([]byte(payload))); err != nil {
+				t.Error(err)
+			}
 			wg.Done()
 		}()
 	}