Browse Source

Merge pull request #224 from rprx/master

VLESS PREVIEW 2
RPRX 5 years ago
parent
commit
d77b88c7c2

+ 10 - 4
infra/conf/vless.go

@@ -48,9 +48,12 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) {
 			return nil, newError(`VLESS clients: invalid user`).Base(err)
 		}
 
-		if account.Flow != "" {
-			return nil, newError(`VLESS clients: "flow" is not available in this version`)
+		switch account.Flow {
+		case "", "xtls-rprx-origin":
+		default:
+			return nil, newError(`VLESS clients: "flow" only accepts "", "xtls-rprx-origin" in this version`)
 		}
+
 		if account.Encryption != "" {
 			return nil, newError(`VLESS clients: "encryption" should not in inbound settings`)
 		}
@@ -161,9 +164,12 @@ func (c *VLessOutboundConfig) Build() (proto.Message, error) {
 				return nil, newError(`VLESS users: invalid user`).Base(err)
 			}
 
-			if account.Flow != "" {
-				return nil, newError(`VLESS users: "flow" is not available in this version`)
+			switch account.Flow {
+			case "", "xtls-rprx-origin", "xtls-rprx-origin-udp443":
+			default:
+				return nil, newError(`VLESS users: "flow" only accepts "", "xtls-rprx-origin", "xtls-rprx-origin-udp443" in this version`)
 			}
+
 			if account.Encryption != "none" {
 				return nil, newError(`VLESS users: please add/set "encryption":"none" for every user`)
 			}

+ 5 - 1
infra/conf/vless_test.go

@@ -26,6 +26,7 @@ func TestVLessOutbound(t *testing.T) {
 					"users": [
 						{
 							"id": "27848739-7e62-4138-9fd3-098a63964b6b",
+							"flow": "xtls-rprx-origin-udp443",
 							"encryption": "none",
 							"level": 0
 						}
@@ -46,6 +47,7 @@ func TestVLessOutbound(t *testing.T) {
 							{
 								Account: serial.ToTypedMessage(&vless.Account{
 									Id:         "27848739-7e62-4138-9fd3-098a63964b6b",
+									Flow:       "xtls-rprx-origin-udp443",
 									Encryption: "none",
 								}),
 								Level: 0,
@@ -69,6 +71,7 @@ func TestVLessInbound(t *testing.T) {
 				"clients": [
 					{
 						"id": "27848739-7e62-4138-9fd3-098a63964b6b",
+						"flow": "xtls-rprx-origin",
 						"level": 0,
 						"email": "love@v2fly.org"
 					}
@@ -94,7 +97,8 @@ func TestVLessInbound(t *testing.T) {
 				Clients: []*protocol.User{
 					{
 						Account: serial.ToTypedMessage(&vless.Account{
-							Id: "27848739-7e62-4138-9fd3-098a63964b6b",
+							Id:   "27848739-7e62-4138-9fd3-098a63964b6b",
+							Flow: "xtls-rprx-origin",
 						}),
 						Level: 0,
 						Email: "love@v2fly.org",

+ 131 - 4
proxy/vless/encoding/addons.go

@@ -9,11 +9,24 @@ import (
 
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/protocol"
+	"v2ray.com/core/proxy/vless"
 )
 
 func EncodeHeaderAddons(buffer *buf.Buffer, addons *Addons) error {
 
 	switch addons.Flow {
+	case vless.XRO:
+
+		if bytes, err := proto.Marshal(addons); err != nil {
+			return newError("failed to marshal addons protobuf value").Base(err)
+		}
+		if err := buffer.WriteByte(byte(len(bytes))); err != nil {
+			return newError("failed to write addons protobuf length").Base(err)
+		}
+		if _, err := buffer.Write(bytes); err != nil {
+			return newError("failed to write addons protobuf value").Base(err)
+		}
+
 	default:
 
 		if err := buffer.WriteByte(0); err != nil {
@@ -62,22 +75,136 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*Addons, error) {
 func EncodeBodyAddons(writer io.Writer, request *protocol.RequestHeader, addons *Addons) buf.Writer {
 
 	switch addons.Flow {
-	default:
+	case vless.XRO:
 
-		return buf.NewWriter(writer)
+		if request.Command == protocol.RequestCommandUDP {
+			return NewMultiLengthPacketWriter(writer.(buf.Writer))
+		}
 
 	}
 
+	return buf.NewWriter(writer)
+
 }
 
 // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.
 func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *Addons) buf.Reader {
 
 	switch addons.Flow {
-	default:
+	case vless.XRO:
+
+		if request.Command == protocol.RequestCommandUDP {
+			return NewLengthPacketReader(reader)
+		}
 
-		return buf.NewReader(reader)
+	}
+
+	return buf.NewReader(reader)
+
+}
 
+func NewMultiLengthPacketWriter(writer buf.Writer) *MultiLengthPacketWriter {
+	return &MultiLengthPacketWriter{
+		Writer: writer,
 	}
+}
+
+type MultiLengthPacketWriter struct {
+	buf.Writer
+}
 
+func (w *MultiLengthPacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	defer buf.ReleaseMulti(mb)
+	mb2Write := make(buf.MultiBuffer, 0, len(mb)+1)
+	for _, b := range mb {
+		length := b.Len()
+		if length == 0 || length+2 > buf.Size {
+			continue
+		}
+		eb := buf.New()
+		if err := eb.WriteByte(byte(length >> 8)); err != nil {
+			eb.Release()
+			continue
+		}
+		if err := eb.WriteByte(byte(length)); err != nil {
+			eb.Release()
+			continue
+		}
+		if _, err := eb.Write(b.Bytes()); err != nil {
+			eb.Release()
+			continue
+		}
+		mb2Write = append(mb2Write, eb)
+	}
+	if mb2Write.IsEmpty() {
+		return nil
+	}
+	return w.Writer.WriteMultiBuffer(mb2Write)
+}
+
+func NewLengthPacketWriter(writer io.Writer) *LengthPacketWriter {
+	return &LengthPacketWriter{
+		Writer: writer,
+		cache:  make([]byte, 0, 65536),
+	}
+}
+
+type LengthPacketWriter struct {
+	io.Writer
+	cache []byte
+}
+
+func (w *LengthPacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	length := mb.Len() // none of mb is nil
+	//fmt.Println("Write", length)
+	if length == 0 {
+		return nil
+	}
+	defer func() {
+		w.cache = w.cache[:0]
+	}()
+	w.cache = append(w.cache, byte(length>>8), byte(length))
+	for i, b := range mb {
+		w.cache = append(w.cache, b.Bytes()...)
+		b.Release()
+		mb[i] = nil
+	}
+	if _, err := w.Write(w.cache); err != nil {
+		return newError("failed to write a packet").Base(err)
+	}
+	return nil
+}
+
+func NewLengthPacketReader(reader io.Reader) *LengthPacketReader {
+	return &LengthPacketReader{
+		Reader: reader,
+		cache:  make([]byte, 2),
+	}
+}
+
+type LengthPacketReader struct {
+	io.Reader
+	cache []byte
+}
+
+func (r *LengthPacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
+	if _, err := io.ReadFull(r.Reader, r.cache); err != nil { // maybe EOF
+		return nil, newError("failed to read packet length").Base(err)
+	}
+	length := int(r.cache[0])<<8 | int(r.cache[1])
+	//fmt.Println("Read", length)
+	mb := make(buf.MultiBuffer, 0, length/buf.Size+1)
+	for length > 0 {
+		size := length
+		if length > buf.Size {
+			size = buf.Size
+		}
+		length -= size
+		b := buf.New()
+		if _, err := b.ReadFullFrom(r.Reader, int32(size)); err != nil {
+			return nil, newError("failed to read packet payload").Base(err)
+		}
+		mb = append(mb, b)
+	}
+	return mb, nil
 }

+ 5 - 5
proxy/vless/encoding/encoding.go

@@ -153,23 +153,23 @@ func EncodeResponseHeader(writer io.Writer, request *protocol.RequestHeader, res
 }
 
 // DecodeResponseHeader decodes and returns (if successful) a ResponseHeader from an input stream.
-func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader, responseAddons *Addons) error {
+func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*Addons, error) {
 
 	buffer := buf.StackNew()
 	defer buffer.Release()
 
 	if _, err := buffer.ReadFullFrom(reader, 1); err != nil {
-		return newError("failed to read response version").Base(err)
+		return nil, newError("failed to read response version").Base(err)
 	}
 
 	if buffer.Byte(0) != request.Version {
-		return newError("unexpected response version. Expecting ", int(request.Version), " but actually ", int(buffer.Byte(0)))
+		return nil, newError("unexpected response version. Expecting ", int(request.Version), " but actually ", int(buffer.Byte(0)))
 	}
 
 	responseAddons, err := DecodeHeaderAddons(&buffer, reader)
 	if err != nil {
-		return newError("failed to decode response header addons").Base(err)
+		return nil, newError("failed to decode response header addons").Base(err)
 	}
 
-	return nil
+	return responseAddons, nil
 }

+ 57 - 26
proxy/vless/inbound/inbound.go

@@ -6,7 +6,6 @@ package inbound
 
 import (
 	"context"
-	"encoding/hex"
 	"io"
 	"strconv"
 	"time"
@@ -17,6 +16,7 @@ import (
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/platform"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/session"
@@ -30,6 +30,11 @@ import (
 	"v2ray.com/core/proxy/vless/encoding"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/tls"
+	"v2ray.com/core/transport/internet/xtls"
+)
+
+var (
+	xtls_show = false
 )
 
 func init() {
@@ -43,6 +48,13 @@ func init() {
 		}
 		return New(ctx, config.(*Config), dc)
 	}))
+
+	const defaultFlagValue = "NOT_DEFINED_AT_ALL"
+
+	xtlsShow := platform.NewEnvFlag("v2ray.vless.xtls.show").GetValue(func() string { return defaultFlagValue })
+	if xtlsShow == "true" {
+		xtls_show = true
+	}
 }
 
 // Handler is an inbound connection handler that handles messages in VLess protocol.
@@ -135,6 +147,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 
 	sid := session.ExportIDToError(ctx)
 
+	iConn := connection
+	if statConn, ok := iConn.(*internet.StatCouterConnection); ok {
+		iConn = statConn.Connection
+	}
+
 	sessionPolicy := h.policyManager.ForLevel(0)
 	if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
 		return newError("unable to set read deadline").Base(err).AtWarning()
@@ -183,16 +200,15 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 
 			alpn := ""
 			if len(apfb) > 1 || apfb[""] == nil {
-				iConn := connection
-				if statConn, ok := iConn.(*internet.StatCouterConnection); ok {
-					iConn = statConn.Connection
-				}
 				if tlsConn, ok := iConn.(*tls.Conn); ok {
 					alpn = tlsConn.ConnectionState().NegotiatedProtocol
 					newError("realAlpn = " + alpn).AtInfo().WriteToLog(sid)
-					if apfb[alpn] == nil {
-						alpn = ""
-					}
+				} else if xtlsConn, ok := iConn.(*xtls.Conn); ok {
+					alpn = xtlsConn.ConnectionState().NegotiatedProtocol
+					newError("realAlpn = " + alpn).AtInfo().WriteToLog(sid)
+				}
+				if apfb[alpn] == nil {
+					alpn = ""
 				}
 			}
 			pfb := apfb[alpn]
@@ -307,18 +323,9 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 							pro.Write(net.ParseIP(remoteAddr).To16())
 							pro.Write(net.ParseIP(localAddr).To16())
 						}
-						p1, _ := strconv.ParseInt(remotePort, 10, 64)
-						b1, _ := hex.DecodeString(strconv.FormatInt(p1, 16))
-						p2, _ := strconv.ParseInt(localPort, 10, 64)
-						b2, _ := hex.DecodeString(strconv.FormatInt(p2, 16))
-						if len(b1) == 1 {
-							pro.WriteByte(0)
-						}
-						pro.Write(b1)
-						if len(b2) == 1 {
-							pro.WriteByte(0)
-						}
-						pro.Write(b2)
+						p1, _ := strconv.ParseUint(remotePort, 10, 16)
+						p2, _ := strconv.ParseUint(localPort, 10, 16)
+						pro.Write([]byte{byte(p1 >> 8), byte(p1), byte(p2 >> 8), byte(p2)})
 					}
 					if err := serverWriter.WriteMultiBuffer(buf.MultiBuffer{pro}); err != nil {
 						return newError("failed to set PROXY protocol v", fb.Xver).Base(err).AtWarning()
@@ -376,6 +383,34 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 	}
 	inbound.User = request.User
 
+	account := request.User.Account.(*vless.MemoryAccount)
+
+	responseAddons := &encoding.Addons{
+		Flow: requestAddons.Flow,
+	}
+
+	switch requestAddons.Flow {
+	case vless.XRO:
+		if account.Flow == vless.XRO {
+			switch request.Command {
+			case protocol.RequestCommandMux:
+				return newError(vless.XRO + " doesn't support Mux").AtWarning()
+			case protocol.RequestCommandUDP:
+				//return newError(vless.XRO + " doesn't support UDP").AtWarning()
+			case protocol.RequestCommandTCP:
+				if xtlsConn, ok := iConn.(*xtls.Conn); ok {
+					xtlsConn.RPRX = true
+					xtlsConn.SHOW = xtls_show
+					xtlsConn.MARK = "XTLS"
+				} else {
+					return newError(`failed to use ` + vless.XRO + `, maybe "security" is not "xtls"`).AtWarning()
+				}
+			}
+		} else {
+			return newError(account.ID.String() + " is not able to use " + vless.XRO).AtWarning()
+		}
+	}
+
 	if request.Command != protocol.RequestCommandMux {
 		ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
 			From:   connection.RemoteAddr(),
@@ -396,8 +431,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 		return newError("failed to dispatch request to ", request.Destination()).Base(err).AtWarning()
 	}
 
-	serverReader := link.Reader
-	serverWriter := link.Writer
+	serverReader := link.Reader // .(*pipe.Reader)
+	serverWriter := link.Writer // .(*pipe.Writer)
 
 	postRequest := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
@@ -416,10 +451,6 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
 	getResponse := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
 
-		responseAddons := &encoding.Addons{
-			Flow: requestAddons.Flow,
-		}
-
 		bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
 		if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil {
 			return newError("failed to encode response header").Base(err).AtWarning()

+ 51 - 8
proxy/vless/outbound/outbound.go

@@ -12,6 +12,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/platform"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/retry"
 	"v2ray.com/core/common/session"
@@ -22,12 +23,24 @@ import (
 	"v2ray.com/core/proxy/vless/encoding"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport/internet"
+	"v2ray.com/core/transport/internet/xtls"
+)
+
+var (
+	xtls_show = false
 )
 
 func init() {
 	common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
 		return New(ctx, config.(*Config))
 	}))
+
+	const defaultFlagValue = "NOT_DEFINED_AT_ALL"
+
+	xtlsShow := platform.NewEnvFlag("v2ray.vless.xtls.show").GetValue(func() string { return defaultFlagValue })
+	if xtlsShow == "true" {
+		xtls_show = true
+	}
 }
 
 // Handler is an outbound connection handler for VLess protocol.
@@ -60,13 +73,13 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 }
 
 // Process implements proxy.Outbound.Process().
-func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
+func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
 
 	var rec *protocol.ServerSpec
 	var conn internet.Connection
 
 	if err := retry.ExponentialBackoff(5, 200).On(func() error {
-		rec = v.serverPicker.PickServer()
+		rec = h.serverPicker.PickServer()
 		var err error
 		conn, err = dialer.Dial(ctx, rec.Destination())
 		if err != nil {
@@ -78,6 +91,11 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 	}
 	defer conn.Close() // nolint: errcheck
 
+	iConn := conn
+	if statConn, ok := iConn.(*internet.StatCouterConnection); ok {
+		iConn = statConn.Connection
+	}
+
 	outbound := session.OutboundFromContext(ctx)
 	if outbound == nil || !outbound.Target.IsValid() {
 		return newError("target not specified").AtError()
@@ -108,12 +126,38 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 		Flow: account.Flow,
 	}
 
-	sessionPolicy := v.policyManager.ForLevel(request.User.Level)
+	switch requestAddons.Flow {
+	case vless.XRO, vless.XRO + "-udp443":
+		switch request.Command {
+		case protocol.RequestCommandMux:
+			return newError(vless.XRO + " doesn't support Mux").AtWarning()
+		case protocol.RequestCommandUDP:
+			if requestAddons.Flow == vless.XRO && request.Port == 443 {
+				return newError(vless.XRO + " stopped UDP/443").AtWarning()
+			}
+			requestAddons.Flow = vless.XRO
+		case protocol.RequestCommandTCP:
+			if xtlsConn, ok := iConn.(*xtls.Conn); ok {
+				xtlsConn.RPRX = true
+				xtlsConn.SHOW = xtls_show
+				xtlsConn.MARK = "XTLS"
+			} else {
+				return newError(`failed to use ` + vless.XRO + `, maybe "security" is not "xtls"`).AtWarning()
+			}
+			requestAddons.Flow = vless.XRO
+		}
+	default:
+		if _, ok := iConn.(*xtls.Conn); ok {
+			panic(`To avoid misunderstanding, you must fill in VLESS "flow" when using XTLS.`)
+		}
+	}
+
+	sessionPolicy := h.policyManager.ForLevel(request.User.Level)
 	ctx, cancel := context.WithCancel(ctx)
 	timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
 
-	clientReader := link.Reader
-	clientWriter := link.Writer
+	clientReader := link.Reader // .(*pipe.Reader)
+	clientWriter := link.Writer // .(*pipe.Writer)
 
 	postRequest := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
@@ -151,9 +195,8 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 	getResponse := func() error {
 		defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
 
-		responseAddons := new(encoding.Addons)
-
-		if err := encoding.DecodeResponseHeader(conn, request, responseAddons); err != nil {
+		responseAddons, err := encoding.DecodeResponseHeader(conn, request)
+		if err != nil {
 			return newError("failed to decode response header").Base(err).AtWarning()
 		}
 

+ 4 - 0
proxy/vless/vless.go

@@ -6,3 +6,7 @@
 package vless
 
 //go:generate errorgen
+
+const (
+	XRO = "xtls-rprx-origin"
+)