Bläddra i källkod

Migrate VMessIn with protocol

v2ray 9 år sedan
förälder
incheckning
547cc75651

+ 7 - 0
common/protocol/headers.go

@@ -32,6 +32,13 @@ type RequestHeader struct {
 	Port    v2net.Port
 }
 
+func (this *RequestHeader) Destination() v2net.Destination {
+	if this.Command == RequestCommandUDP {
+		return v2net.UDPDestination(this.Address, this.Port)
+	}
+	return v2net.TCPDestination(this.Address, this.Port)
+}
+
 type ResponseCommand interface{}
 
 type ResponseHeader struct {

+ 5 - 1
common/protocol/raw/commands.go

@@ -19,6 +19,10 @@ var (
 )
 
 func MarshalCommand(command interface{}, writer io.Writer) error {
+	if command == nil {
+		return ErrorUnknownCommand
+	}
+
 	var cmdId byte
 	var factory CommandFactory
 	switch command.(type) {
@@ -29,7 +33,7 @@ func MarshalCommand(command interface{}, writer io.Writer) error {
 		return ErrorUnknownCommand
 	}
 
-	buffer := alloc.NewSmallBuffer()
+	buffer := alloc.NewSmallBuffer().Clear()
 	err := factory.Marshal(command, buffer)
 	if err != nil {
 		return err

+ 3 - 3
common/protocol/raw/commands_test.go

@@ -1,9 +1,9 @@
 package raw_test
 
 import (
-	"bytes"
 	"testing"
 
+	"github.com/v2ray/v2ray-core/common/alloc"
 	netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
 	"github.com/v2ray/v2ray-core/common/protocol"
 	. "github.com/v2ray/v2ray-core/common/protocol/raw"
@@ -23,11 +23,11 @@ func TestSwitchAccount(t *testing.T) {
 		ValidMin: 16,
 	}
 
-	buffer := bytes.NewBuffer(make([]byte, 0, 1024))
+	buffer := alloc.NewBuffer().Clear()
 	err := MarshalCommand(sa, buffer)
 	assert.Error(err).IsNil()
 
-	cmd, err := UnmarshalCommand(1, buffer.Bytes())
+	cmd, err := UnmarshalCommand(1, buffer.Value[2:])
 	assert.Error(err).IsNil()
 
 	sa2, ok := cmd.(*protocol.CommandSwitchAccount)

+ 11 - 2
common/protocol/raw/server.go

@@ -24,6 +24,12 @@ type ServerSession struct {
 	responseWriter  io.Writer
 }
 
+func NewServerSession(validator protocol.UserValidator) *ServerSession {
+	return &ServerSession{
+		userValidator: validator,
+	}
+}
+
 func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.RequestHeader, error) {
 	buffer := alloc.NewSmallBuffer()
 	defer buffer.Release()
@@ -134,14 +140,17 @@ func (this *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader,
 	responseBodyKey := md5.Sum(this.requestBodyKey)
 	responseBodyIV := md5.Sum(this.requestBodyIV)
 	this.responseBodyKey = responseBodyKey[:]
-	this.requestBodyIV = responseBodyIV[:]
+	this.responseBodyIV = responseBodyIV[:]
 
 	aesStream := crypto.NewAesEncryptionStream(this.responseBodyKey, this.responseBodyIV)
 	encryptionWriter := crypto.NewCryptionWriter(aesStream, writer)
 	this.responseWriter = encryptionWriter
 
 	encryptionWriter.Write([]byte{this.responseHeader, 0x00})
-	MarshalCommand(header.Command, encryptionWriter)
+	err := MarshalCommand(header.Command, encryptionWriter)
+	if err != nil {
+		encryptionWriter.Write([]byte{0x00, 0x00})
+	}
 }
 
 func (this *ServerSession) EncodeResponseBody(writer io.Writer) io.Writer {

+ 5 - 23
proxy/vmess/inbound/command.go

@@ -1,20 +1,12 @@
 package inbound
 
 import (
-	"hash/fnv"
-
-	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
+	"github.com/v2ray/v2ray-core/common/protocol"
 	"github.com/v2ray/v2ray-core/common/serial"
-	"github.com/v2ray/v2ray-core/proxy/vmess/command"
-	"github.com/v2ray/v2ray-core/proxy/vmess/protocol"
 )
 
-func (this *VMessInboundHandler) generateCommand(request *protocol.VMessRequest, buffer *alloc.Buffer) {
-	cmd := byte(0)
-	commandBytes := alloc.NewSmallBuffer().Clear()
-	defer commandBytes.Release()
-
+func (this *VMessInboundHandler) generateCommand(request *protocol.RequestHeader) protocol.ResponseCommand {
 	if this.features != nil && this.features.Detour != nil {
 
 		tag := this.features.Detour.ToTag
@@ -25,29 +17,19 @@ func (this *VMessInboundHandler) generateCommand(request *protocol.VMessRequest,
 				if availableMin > 255 {
 					availableMin = 255
 				}
-				cmd = byte(1)
+
 				log.Info("VMessIn: Pick detour handler for port ", inboundHandler.Port(), " for ", availableMin, " minutes.")
 				user := inboundHandler.GetUser(request.User.Email)
-				saCmd := &command.SwitchAccount{
+				return &protocol.CommandSwitchAccount{
 					Port:     inboundHandler.Port(),
 					ID:       user.ID.UUID(),
 					AlterIds: serial.Uint16Literal(len(user.AlterIDs)),
 					Level:    user.Level,
 					ValidMin: byte(availableMin),
 				}
-				saCmd.Marshal(commandBytes)
 			}
 		}
 	}
 
-	if cmd == 0 || commandBytes.Len()+4 > 256 {
-		buffer.AppendBytes(byte(0), byte(0))
-	} else {
-		buffer.AppendBytes(cmd, byte(commandBytes.Len()+4))
-		fnv1hash := fnv.New32a()
-		fnv1hash.Write(commandBytes.Value)
-		hashValue := fnv1hash.Sum32()
-		buffer.AppendBytes(byte(hashValue>>24), byte(hashValue>>16), byte(hashValue>>8), byte(hashValue))
-		buffer.Append(commandBytes.Value)
-	}
+	return nil
 }

+ 32 - 36
proxy/vmess/inbound/inbound.go

@@ -1,19 +1,16 @@
 package inbound
 
 import (
-	"crypto/md5"
-	"io"
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/app/dispatcher"
 	"github.com/v2ray/v2ray-core/app/proxyman"
-	"github.com/v2ray/v2ray-core/common/alloc"
-	v2crypto "github.com/v2ray/v2ray-core/common/crypto"
 	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	proto "github.com/v2ray/v2ray-core/common/protocol"
+	raw "github.com/v2ray/v2ray-core/common/protocol/raw"
 	"github.com/v2ray/v2ray-core/common/serial"
 	"github.com/v2ray/v2ray-core/common/uuid"
 	"github.com/v2ray/v2ray-core/proxy"
@@ -122,9 +119,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
 	defer connection.Close()
 
 	connReader := v2net.NewTimeOutReader(16, connection)
-	requestReader := protocol.NewVMessRequestReader(this.clients)
 
-	request, err := requestReader.Read(connReader)
+	reader := v2io.NewBufferedReader(connReader)
+	session := raw.NewServerSession(this.clients)
+
+	request, err := session.DecodeRequestHeader(reader)
 	if err != nil {
 		log.Access(connection.RemoteAddr(), serial.StringLiteral(""), log.AccessRejected, serial.StringLiteral(err.Error()))
 		log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
@@ -142,30 +141,42 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
 
 	userSettings := proto.GetUserSettings(request.User.Level)
 	connReader.SetTimeOut(userSettings.PayloadReadTimeout)
-	go handleInput(request, connReader, input, &readFinish)
+	reader.SetCached(false)
+	go func() {
+		defer close(input)
+		defer readFinish.Unlock()
+		bodyReader := session.DecodeRequestBody(reader)
+		var requestReader v2io.Reader
+		if request.Option.IsChunkStream() {
+			requestReader = vmessio.NewAuthChunkReader(bodyReader)
+		} else {
+			requestReader = v2io.NewAdaptiveReader(bodyReader)
+		}
+		v2io.ReaderToChan(input, requestReader)
+	}()
 
-	responseKey := md5.Sum(request.RequestKey)
-	responseIV := md5.Sum(request.RequestIV)
+	writer := v2io.NewBufferedWriter(connection)
 
-	aesStream := v2crypto.NewAesEncryptionStream(responseKey[:], responseIV[:])
-	responseWriter := v2crypto.NewCryptionWriter(aesStream, connection)
+	response := &proto.ResponseHeader{
+		Command: this.generateCommand(request),
+	}
 
-	// Optimize for small response packet
-	buffer := alloc.NewLargeBuffer().Clear()
-	defer buffer.Release()
-	buffer.AppendBytes(request.ResponseHeader, byte(0))
-	this.generateCommand(request, buffer)
+	session.EncodeResponseHeader(response, writer)
+
+	bodyWriter := session.EncodeResponseBody(writer)
 
+	// Optimize for small response packet
 	if data, open := <-output; open {
-		if request.IsChunkStream() {
+		if request.Option.IsChunkStream() {
 			vmessio.Authenticate(data)
 		}
-		buffer.Append(data.Value)
+		bodyWriter.Write(data.Value)
 		data.Release()
-		responseWriter.Write(buffer.Value)
+
+		writer.SetCached(false)
 		go func(finish *sync.Mutex) {
-			var writer v2io.Writer = v2io.NewAdaptiveWriter(responseWriter)
-			if request.IsChunkStream() {
+			var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
+			if request.Option.IsChunkStream() {
 				writer = vmessio.NewAuthChunkWriter(writer)
 			}
 			v2io.ChanToWriter(writer, output)
@@ -178,21 +189,6 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
 	readFinish.Lock()
 }
 
-func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
-	defer close(input)
-	defer finish.Unlock()
-
-	aesStream := v2crypto.NewAesDecryptionStream(request.RequestKey, request.RequestIV)
-	descriptionReader := v2crypto.NewCryptionReader(aesStream, reader)
-	var requestReader v2io.Reader
-	if request.IsChunkStream() {
-		requestReader = vmessio.NewAuthChunkReader(descriptionReader)
-	} else {
-		requestReader = v2io.NewAdaptiveReader(descriptionReader)
-	}
-	v2io.ReaderToChan(input, requestReader)
-}
-
 func init() {
 	internal.MustRegisterInboundHandlerCreator("vmess",
 		func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {