Selaa lähdekoodia

send reuse option in header

v2ray 9 vuotta sitten
vanhempi
commit
72fb5a256c

+ 31 - 3
common/protocol/headers.go

@@ -13,13 +13,22 @@ const (
 )
 
 const (
-	RequestOptionChunkStream = RequestOption(0x01)
+	RequestOptionChunkStream     = RequestOption(0x01)
+	RequestOptionConnectionReuse = RequestOption(0x02)
 )
 
 type RequestOption byte
 
-func (this RequestOption) IsChunkStream() bool {
-	return (this & RequestOptionChunkStream) == RequestOptionChunkStream
+func (this RequestOption) Has(option RequestOption) bool {
+	return (this & option) == option
+}
+
+func (this *RequestOption) Set(option RequestOption) {
+	*this = (*this | option)
+}
+
+func (this *RequestOption) Clear(option RequestOption) {
+	*this = (*this & (^option))
 }
 
 type RequestHeader struct {
@@ -38,9 +47,28 @@ func (this *RequestHeader) Destination() v2net.Destination {
 	return v2net.TCPDestination(this.Address, this.Port)
 }
 
+type ResponseOption byte
+
+var (
+	ResponseOptionConnectionReuse = ResponseOption(1)
+)
+
+func (this *ResponseOption) Set(option ResponseOption) {
+	*this = (*this | option)
+}
+
+func (this ResponseOption) Has(option ResponseOption) bool {
+	return (this | option) == option
+}
+
+func (this *ResponseOption) Clear(option ResponseOption) {
+	*this = (*this & (^option))
+}
+
 type ResponseCommand interface{}
 
 type ResponseHeader struct {
+	Option  ResponseOption
 	Command ResponseCommand
 }
 

+ 34 - 0
common/protocol/headers_test.go

@@ -0,0 +1,34 @@
+package protocol_test
+
+import (
+	"testing"
+
+	. "github.com/v2ray/v2ray-core/common/protocol"
+	"github.com/v2ray/v2ray-core/testing/assert"
+)
+
+func TestRequestOptionSet(t *testing.T) {
+	assert := assert.On(t)
+
+	option := new(RequestOption)
+	assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse()
+
+	option.Set(RequestOptionChunkStream)
+	assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue()
+
+	option.Set(RequestOptionConnectionReuse)
+	assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue()
+	assert.Bool(option.Has(RequestOptionChunkStream)).IsTrue()
+}
+
+func TestRequestOptionClear(t *testing.T) {
+	assert := assert.On(t)
+
+	option := new(RequestOption)
+	option.Set(RequestOptionChunkStream)
+	option.Set(RequestOptionConnectionReuse)
+
+	option.Clear(RequestOptionChunkStream)
+	assert.Bool(option.Has(RequestOptionChunkStream)).IsFalse()
+	assert.Bool(option.Has(RequestOptionConnectionReuse)).IsTrue()
+}

+ 3 - 1
common/protocol/raw/client.go

@@ -118,7 +118,9 @@ func (this *ClientSession) DecodeResponseHeader(reader io.Reader) (*protocol.Res
 		return nil, transport.ErrorCorruptedPacket
 	}
 
-	header := new(protocol.ResponseHeader)
+	header := &protocol.ResponseHeader{
+		Option: protocol.ResponseOption(buffer.Value[1]),
+	}
 
 	if buffer.Value[2] != 0 {
 		cmdId := buffer.Value[2]

+ 1 - 1
common/protocol/raw/server.go

@@ -159,7 +159,7 @@ func (this *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader,
 	encryptionWriter := crypto.NewCryptionWriter(aesStream, writer)
 	this.responseWriter = encryptionWriter
 
-	encryptionWriter.Write([]byte{this.responseHeader, 0x00})
+	encryptionWriter.Write([]byte{this.responseHeader, byte(header.Option)})
 	err := MarshalCommand(header.Command, encryptionWriter)
 	if err != nil {
 		encryptionWriter.Write([]byte{0x00, 0x00})

+ 10 - 4
proxy/vmess/inbound/inbound.go

@@ -17,6 +17,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
+	"github.com/v2ray/v2ray-core/transport"
 	"github.com/v2ray/v2ray-core/transport/hub"
 )
 
@@ -145,7 +146,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 	log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
 	log.Debug("VMessIn: Received request for ", request.Destination())
 
-	if request.Option.IsChunkStream() {
+	if request.Option.Has(protocol.RequestOptionConnectionReuse) {
 		connection.SetReusable(true)
 	}
 
@@ -161,10 +162,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 	userSettings := protocol.GetUserSettings(request.User.Level)
 	connReader.SetTimeOut(userSettings.PayloadReadTimeout)
 	reader.SetCached(false)
+
 	go func() {
 		bodyReader := session.DecodeRequestBody(reader)
 		var requestReader v2io.Reader
-		if request.Option.IsChunkStream() {
+		if request.Option.Has(protocol.RequestOptionChunkStream) {
 			requestReader = vmessio.NewAuthChunkReader(bodyReader)
 		} else {
 			requestReader = v2io.NewAdaptiveReader(bodyReader)
@@ -186,6 +188,10 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 		Command: this.generateCommand(request),
 	}
 
+	if request.Option.Has(protocol.RequestOptionConnectionReuse) && transport.IsConnectionReusable() {
+		response.Option.Set(protocol.ResponseOptionConnectionReuse)
+	}
+
 	session.EncodeResponseHeader(response, writer)
 
 	bodyWriter := session.EncodeResponseBody(writer)
@@ -193,7 +199,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 	// Optimize for small response packet
 	if data, err := output.Read(); err == nil {
 		var v2writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
-		if request.Option.IsChunkStream() {
+		if request.Option.Has(protocol.RequestOptionChunkStream) {
 			v2writer = vmessio.NewAuthChunkWriter(v2writer)
 		}
 
@@ -207,7 +213,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 		}
 
 		output.Release()
-		if request.Option.IsChunkStream() {
+		if request.Option.Has(protocol.RequestOptionChunkStream) {
 			v2writer.Write(alloc.NewSmallBuffer().Clear())
 		}
 		v2writer.Release()

+ 15 - 5
proxy/vmess/outbound/outbound.go

@@ -14,6 +14,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
+	"github.com/v2ray/v2ray-core/transport"
 	"github.com/v2ray/v2ray-core/transport/hub"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
@@ -49,7 +50,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
 
 	defer conn.Close()
-	if request.Option.IsChunkStream() {
+
+	if transport.IsConnectionReusable() {
+		request.Option.Set(protocol.RequestOptionConnectionReuse)
 		conn.SetReusable(true)
 	}
 
@@ -79,7 +82,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
 
 	bodyWriter := session.EncodeRequestBody(writer)
 	var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
-	if request.Option.IsChunkStream() {
+	if request.Option.Has(protocol.RequestOptionChunkStream) {
 		streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
 	}
 	streamWriter.Write(payload)
@@ -90,8 +93,11 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
 		conn.SetReusable(false)
 	}
 
-	if request.Option.IsChunkStream() {
-		streamWriter.Write(alloc.NewSmallBuffer().Clear())
+	if request.Option.Has(protocol.RequestOptionChunkStream) {
+		err := streamWriter.Write(alloc.NewSmallBuffer().Clear())
+		if err != nil {
+			conn.SetReusable(false)
+		}
 	}
 	streamWriter.Release()
 	return
@@ -110,11 +116,15 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
 	}
 	go this.handleCommand(dest, header.Command)
 
+	if !header.Option.Has(protocol.ResponseOptionConnectionReuse) {
+		conn.SetReusable(false)
+	}
+
 	reader.SetCached(false)
 	decryptReader := session.DecodeResponseBody(reader)
 
 	var bodyReader v2io.Reader
-	if request.Option.IsChunkStream() {
+	if request.Option.Has(protocol.RequestOptionChunkStream) {
 		bodyReader = vmessio.NewAuthChunkReader(decryptReader)
 	} else {
 		bodyReader = v2io.NewAdaptiveReader(decryptReader)