Browse Source

Add Early Data and Custom Header Support to HTTPUpgrade Transport (#3276)

* Add early reply draining for httpupgrade

* Add early data config for httpupgrade

* Add early data implementation for httpupgrade

* Add send extra header for httpupgrade
Xiaokang Wang (Shelikhoo) 10 months ago
parent
commit
4c1d94863b

+ 48 - 0
testing/scenarios/config/httpupgrade_earlydataShortEarlyData_client.json

@@ -0,0 +1,48 @@
+{
+  "log": {
+    "error": {
+      "level": "Debug",
+      "type": "Console"
+    },
+    "access": {
+      "type": "None"
+    }
+  },
+  "outbounds": [
+    {
+      "protocol": "vmess",
+      "settings": {
+        "address": "127.0.0.1",
+        "port": 17793,
+        "uuid": "bcc71618-e552-42c2-a2a3-d4c17a9df764"
+      },
+      "streamSettings": {
+        "transport": "httpupgrade",
+        "transportSettings": {
+          "path": "b66efc0c7752",
+          "maxEarlyData": 32,
+          "earlyDataHeaderName": "Sec-Websocket-Key"
+        },
+        "security": "tls",
+        "securitySettings": {
+          "pinnedPeerCertificateChainSha256": [
+            "kqHyvea27Pn+JiSqA72lhu9IKAKeGR+3yCyA8JR1mug="
+          ],
+          "allowInsecureIfPinnedPeerCertificate": true
+        }
+      }
+    }
+  ],
+  "inbounds": [
+    {
+      "protocol": "socks",
+      "settings": {
+        "udpEnabled": false,
+        "address": "127.0.0.1",
+        "packetEncoding": "Packet"
+      },
+      "port": 17794,
+      "listen": "127.0.0.1"
+    }
+  ]
+}

+ 46 - 0
testing/scenarios/config/httpupgrade_earlydataShortEarlyData_server.json

@@ -0,0 +1,46 @@
+{
+  "log": {
+    "error": {
+      "level": "Debug",
+      "type": "Console"
+    },
+    "access": {
+      "type": "None"
+    }
+  },
+  "outbounds": [
+    {
+      "protocol": "freedom"
+    }
+  ],
+  "inbounds": [
+    {
+      "listen": "127.0.0.1",
+      "port": 17793,
+      "protocol": "vmess",
+      "settings": {
+        "users": [
+          "bcc71618-e552-42c2-a2a3-d4c17a9df764"
+        ]
+      },
+      "streamSettings": {
+        "transport": "httpupgrade",
+        "transportSettings": {
+          "path": "b66efc0c7752",
+          "maxEarlyData": 32,
+          "earlyDataHeaderName": "Sec-Websocket-Key"
+        },
+        "security": "tls",
+        "securitySettings": {
+          "certificate": [
+            {
+              "usage": "ENCIPHERMENT",
+              "certificateFile": "cert/self-signed_cert.pem",
+              "keyFile": "cert/self-signed_key.pem"
+            }
+          ]
+        }
+      }
+    }
+  ]
+}

+ 48 - 0
testing/scenarios/config/httpupgrade_earlydata_client.json

@@ -0,0 +1,48 @@
+{
+  "log": {
+    "error": {
+      "level": "Debug",
+      "type": "Console"
+    },
+    "access": {
+      "type": "None"
+    }
+  },
+  "outbounds": [
+    {
+      "protocol": "vmess",
+      "settings": {
+        "address": "127.0.0.1",
+        "port": 17793,
+        "uuid": "bcc71618-e552-42c2-a2a3-d4c17a9df764"
+      },
+      "streamSettings": {
+        "transport": "httpupgrade",
+        "transportSettings": {
+          "path": "b66efc0c7752",
+          "maxEarlyData": 2048,
+          "earlyDataHeaderName": "Sec-Websocket-Key"
+        },
+        "security": "tls",
+        "securitySettings": {
+          "pinnedPeerCertificateChainSha256": [
+            "kqHyvea27Pn+JiSqA72lhu9IKAKeGR+3yCyA8JR1mug="
+          ],
+          "allowInsecureIfPinnedPeerCertificate": true
+        }
+      }
+    }
+  ],
+  "inbounds": [
+    {
+      "protocol": "socks",
+      "settings": {
+        "udpEnabled": false,
+        "address": "127.0.0.1",
+        "packetEncoding": "Packet"
+      },
+      "port": 17794,
+      "listen": "127.0.0.1"
+    }
+  ]
+}

+ 46 - 0
testing/scenarios/config/httpupgrade_earlydata_server.json

@@ -0,0 +1,46 @@
+{
+  "log": {
+    "error": {
+      "level": "Debug",
+      "type": "Console"
+    },
+    "access": {
+      "type": "None"
+    }
+  },
+  "outbounds": [
+    {
+      "protocol": "freedom"
+    }
+  ],
+  "inbounds": [
+    {
+      "listen": "127.0.0.1",
+      "port": 17793,
+      "protocol": "vmess",
+      "settings": {
+        "users": [
+          "bcc71618-e552-42c2-a2a3-d4c17a9df764"
+        ]
+      },
+      "streamSettings": {
+        "transport": "httpupgrade",
+        "transportSettings": {
+          "path": "b66efc0c7752",
+          "maxEarlyData": 2048,
+          "earlyDataHeaderName": "Sec-Websocket-Key"
+        },
+        "security": "tls",
+        "securitySettings": {
+          "certificate": [
+            {
+              "usage": "ENCIPHERMENT",
+              "certificateFile": "cert/self-signed_cert.pem",
+              "keyFile": "cert/self-signed_key.pem"
+            }
+          ]
+        }
+      }
+    }
+  ]
+}

+ 78 - 0
testing/scenarios/httpupgrade_test.go

@@ -50,3 +50,81 @@ func TestHTTPUpgrade(t *testing.T) {
 		t.Error(err)
 	}
 }
+
+func TestHTTPUpgradeWithEarlyData(t *testing.T) {
+	tcpServer := tcp.Server{
+		MsgProcessor: xor,
+	}
+	dest, err := tcpServer.Start()
+	common.Must(err)
+	defer tcpServer.Close()
+
+	coreInst, InstMgrIfce := NewInstanceManagerCoreInstance()
+	defer coreInst.Close()
+
+	common.Must(InstMgrIfce.AddInstance(
+		context.TODO(),
+		"httpupgrade_client",
+		common.Must2(os.ReadFile("config/httpupgrade_earlydata_client.json")).([]byte),
+		"jsonv5"))
+
+	common.Must(InstMgrIfce.AddInstance(
+		context.TODO(),
+		"httpupgrade_server",
+		common.Must2(os.ReadFile("config/httpupgrade_earlydata_server.json")).([]byte),
+		"jsonv5"))
+
+	common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_server"))
+	common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_client"))
+
+	defer func() {
+		common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_server"))
+		common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_client"))
+		common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_server"))
+		common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_client"))
+		coreInst.Close()
+	}()
+
+	if err := testTCPConnViaSocks(17794, dest.Port, 1024, time.Second*2)(); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestHTTPUpgradeWithShortEarlyData(t *testing.T) {
+	tcpServer := tcp.Server{
+		MsgProcessor: xor,
+	}
+	dest, err := tcpServer.Start()
+	common.Must(err)
+	defer tcpServer.Close()
+
+	coreInst, InstMgrIfce := NewInstanceManagerCoreInstance()
+	defer coreInst.Close()
+
+	common.Must(InstMgrIfce.AddInstance(
+		context.TODO(),
+		"httpupgrade_client",
+		common.Must2(os.ReadFile("config/httpupgrade_earlydataShortEarlyData_client.json")).([]byte),
+		"jsonv5"))
+
+	common.Must(InstMgrIfce.AddInstance(
+		context.TODO(),
+		"httpupgrade_server",
+		common.Must2(os.ReadFile("config/httpupgrade_earlydataShortEarlyData_server.json")).([]byte),
+		"jsonv5"))
+
+	common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_server"))
+	common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_client"))
+
+	defer func() {
+		common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_server"))
+		common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_client"))
+		common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_server"))
+		common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_client"))
+		coreInst.Close()
+	}()
+
+	if err := testTCPConnViaSocks(17794, dest.Port, 1024, time.Second*2)(); err != nil {
+		t.Error(err)
+	}
+}

+ 127 - 52
transport/internet/httpupgrade/config.pb.go

@@ -15,22 +15,74 @@ const (
 	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
 )
 
-type Config struct {
-	state         protoimpl.MessageState
-	sizeCache     protoimpl.SizeCache
+type Header struct {
+	state         protoimpl.MessageState `protogen:"open.v1"`
+	Key           string                 `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+	Value         string                 `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
 	unknownFields protoimpl.UnknownFields
+	sizeCache     protoimpl.SizeCache
+}
 
-	Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
-	Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
+func (x *Header) Reset() {
+	*x = Header{}
+	mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0]
+	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+	ms.StoreMessageInfo(mi)
 }
 
-func (x *Config) Reset() {
-	*x = Config{}
-	if protoimpl.UnsafeEnabled {
-		mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0]
+func (x *Header) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Header) ProtoMessage() {}
+
+func (x *Header) ProtoReflect() protoreflect.Message {
+	mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0]
+	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
-		ms.StoreMessageInfo(mi)
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Header.ProtoReflect.Descriptor instead.
+func (*Header) Descriptor() ([]byte, []int) {
+	return file_transport_internet_httpupgrade_config_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Header) GetKey() string {
+	if x != nil {
+		return x.Key
 	}
+	return ""
+}
+
+func (x *Header) GetValue() string {
+	if x != nil {
+		return x.Value
+	}
+	return ""
+}
+
+type Config struct {
+	state               protoimpl.MessageState `protogen:"open.v1"`
+	Path                string                 `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+	Host                string                 `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
+	MaxEarlyData        int32                  `protobuf:"varint,3,opt,name=max_early_data,json=maxEarlyData,proto3" json:"max_early_data,omitempty"`
+	EarlyDataHeaderName string                 `protobuf:"bytes,4,opt,name=early_data_header_name,json=earlyDataHeaderName,proto3" json:"early_data_header_name,omitempty"`
+	Header              []*Header              `protobuf:"bytes,5,rep,name=header,proto3" json:"header,omitempty"`
+	unknownFields       protoimpl.UnknownFields
+	sizeCache           protoimpl.SizeCache
+}
+
+func (x *Config) Reset() {
+	*x = Config{}
+	mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[1]
+	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+	ms.StoreMessageInfo(mi)
 }
 
 func (x *Config) String() string {
@@ -40,8 +92,8 @@ func (x *Config) String() string {
 func (*Config) ProtoMessage() {}
 
 func (x *Config) ProtoReflect() protoreflect.Message {
-	mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0]
-	if protoimpl.UnsafeEnabled && x != nil {
+	mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[1]
+	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
 			ms.StoreMessageInfo(mi)
@@ -53,7 +105,7 @@ func (x *Config) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use Config.ProtoReflect.Descriptor instead.
 func (*Config) Descriptor() ([]byte, []int) {
-	return file_transport_internet_httpupgrade_config_proto_rawDescGZIP(), []int{0}
+	return file_transport_internet_httpupgrade_config_proto_rawDescGZIP(), []int{1}
 }
 
 func (x *Config) GetPath() string {
@@ -70,6 +122,27 @@ func (x *Config) GetHost() string {
 	return ""
 }
 
+func (x *Config) GetMaxEarlyData() int32 {
+	if x != nil {
+		return x.MaxEarlyData
+	}
+	return 0
+}
+
+func (x *Config) GetEarlyDataHeaderName() string {
+	if x != nil {
+		return x.EarlyDataHeaderName
+	}
+	return ""
+}
+
+func (x *Config) GetHeader() []*Header {
+	if x != nil {
+		return x.Header
+	}
+	return nil
+}
+
 var File_transport_internet_httpupgrade_config_proto protoreflect.FileDescriptor
 
 var file_transport_internet_httpupgrade_config_proto_rawDesc = []byte{
@@ -81,22 +154,36 @@ var file_transport_internet_httpupgrade_config_proto_rawDesc = []byte{
 	0x75, 0x65, 0x73, 0x74, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65,
 	0x1a, 0x20, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x65, 0x78,
 	0x74, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f,
-	0x74, 0x6f, 0x22, 0x52, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04,
-	0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68,
-	0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
-	0x68, 0x6f, 0x73, 0x74, 0x3a, 0x20, 0x82, 0xb5, 0x18, 0x1c, 0x0a, 0x09, 0x74, 0x72, 0x61, 0x6e,
-	0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61,
-	0x64, 0x65, 0x90, 0xff, 0x29, 0x01, 0x42, 0x9c, 0x01, 0x0a, 0x2d, 0x63, 0x6f, 0x6d, 0x2e, 0x76,
-	0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70,
-	0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x68, 0x74, 0x74,
-	0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x50, 0x01, 0x5a, 0x3d, 0x67, 0x69, 0x74, 0x68,
-	0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72,
-	0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73,
-	0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x68, 0x74,
-	0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0xaa, 0x02, 0x29, 0x56, 0x32, 0x52, 0x61,
-	0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74,
-	0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x55, 0x70,
-	0x67, 0x72, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x74, 0x6f, 0x22, 0x30, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03,
+	0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14,
+	0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76,
+	0x61, 0x6c, 0x75, 0x65, 0x22, 0x80, 0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
+	0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70,
+	0x61, 0x74, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
+	0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x5f, 0x65,
+	0x61, 0x72, 0x6c, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52,
+	0x0c, 0x6d, 0x61, 0x78, 0x45, 0x61, 0x72, 0x6c, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x33, 0x0a,
+	0x16, 0x65, 0x61, 0x72, 0x6c, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64,
+	0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x65,
+	0x61, 0x72, 0x6c, 0x79, 0x44, 0x61, 0x74, 0x61, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x61,
+	0x6d, 0x65, 0x12, 0x51, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x05, 0x20, 0x03,
+	0x28, 0x0b, 0x32, 0x39, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e,
+	0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e,
+	0x65, 0x74, 0x2e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x75,
+	0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x06, 0x68,
+	0x65, 0x61, 0x64, 0x65, 0x72, 0x3a, 0x20, 0x82, 0xb5, 0x18, 0x1c, 0x0a, 0x09, 0x74, 0x72, 0x61,
+	0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72,
+	0x61, 0x64, 0x65, 0x90, 0xff, 0x29, 0x01, 0x42, 0x9c, 0x01, 0x0a, 0x2d, 0x63, 0x6f, 0x6d, 0x2e,
+	0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73,
+	0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x68, 0x74,
+	0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x50, 0x01, 0x5a, 0x3d, 0x67, 0x69, 0x74,
+	0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32,
+	0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e,
+	0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x68,
+	0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0xaa, 0x02, 0x29, 0x56, 0x32, 0x52,
+	0x61, 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72,
+	0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x55,
+	0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -111,16 +198,18 @@ func file_transport_internet_httpupgrade_config_proto_rawDescGZIP() []byte {
 	return file_transport_internet_httpupgrade_config_proto_rawDescData
 }
 
-var file_transport_internet_httpupgrade_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
-var file_transport_internet_httpupgrade_config_proto_goTypes = []interface{}{
-	(*Config)(nil), // 0: v2ray.core.transport.internet.request.httpupgrade.Config
+var file_transport_internet_httpupgrade_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_transport_internet_httpupgrade_config_proto_goTypes = []any{
+	(*Header)(nil), // 0: v2ray.core.transport.internet.request.httpupgrade.Header
+	(*Config)(nil), // 1: v2ray.core.transport.internet.request.httpupgrade.Config
 }
 var file_transport_internet_httpupgrade_config_proto_depIdxs = []int32{
-	0, // [0:0] is the sub-list for method output_type
-	0, // [0:0] is the sub-list for method input_type
-	0, // [0:0] is the sub-list for extension type_name
-	0, // [0:0] is the sub-list for extension extendee
-	0, // [0:0] is the sub-list for field type_name
+	0, // 0: v2ray.core.transport.internet.request.httpupgrade.Config.header:type_name -> v2ray.core.transport.internet.request.httpupgrade.Header
+	1, // [1:1] is the sub-list for method output_type
+	1, // [1:1] is the sub-list for method input_type
+	1, // [1:1] is the sub-list for extension type_name
+	1, // [1:1] is the sub-list for extension extendee
+	0, // [0:1] is the sub-list for field type_name
 }
 
 func init() { file_transport_internet_httpupgrade_config_proto_init() }
@@ -128,27 +217,13 @@ func file_transport_internet_httpupgrade_config_proto_init() {
 	if File_transport_internet_httpupgrade_config_proto != nil {
 		return
 	}
-	if !protoimpl.UnsafeEnabled {
-		file_transport_internet_httpupgrade_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*Config); i {
-			case 0:
-				return &v.state
-			case 1:
-				return &v.sizeCache
-			case 2:
-				return &v.unknownFields
-			default:
-				return nil
-			}
-		}
-	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
 		File: protoimpl.DescBuilder{
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_transport_internet_httpupgrade_config_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   1,
+			NumMessages:   2,
 			NumExtensions: 0,
 			NumServices:   0,
 		},

+ 8 - 0
transport/internet/httpupgrade/config.proto

@@ -8,6 +8,11 @@ option java_multiple_files = true;
 
 import "common/protoext/extensions.proto";
 
+message Header {
+  string key = 1;
+  string value = 2;
+}
+
 message Config {
   option (v2ray.core.common.protoext.message_opt).type = "transport";
   option (v2ray.core.common.protoext.message_opt).short_name = "httpupgrade";
@@ -15,4 +20,7 @@ message Config {
 
   string path = 1;
   string host = 2;
+  int32 max_early_data = 3;
+  string early_data_header_name = 4;
+  repeated Header header = 5;
 }

+ 152 - 0
transport/internet/httpupgrade/connection.go

@@ -0,0 +1,152 @@
+package httpupgrade
+
+import (
+	"context"
+	"github.com/v2fly/v2ray-core/v5/common/buf"
+	"github.com/v2fly/v2ray-core/v5/common/net"
+	"github.com/v2fly/v2ray-core/v5/common/serial"
+	"io"
+	"time"
+)
+
+type connection struct {
+	conn       net.Conn
+	reader     io.Reader
+	remoteAddr net.Addr
+
+	shouldWait        bool
+	delayedDialFinish context.Context
+	finishedDial      context.CancelFunc
+	dialer            delayedDialer
+}
+
+type delayedDialer func(earlyData []byte) (conn net.Conn, earlyReply io.Reader, err error)
+
+func newConnectionWithPendingRead(conn net.Conn, remoteAddr net.Addr, earlyReplyReader io.Reader) *connection {
+	return &connection{
+		conn:       conn,
+		remoteAddr: remoteAddr,
+		reader:     earlyReplyReader,
+	}
+}
+
+func newConnectionWithDelayedDial(dialer delayedDialer) *connection {
+	ctx, cancel := context.WithCancel(context.Background())
+	return &connection{
+		shouldWait:        true,
+		delayedDialFinish: ctx,
+		finishedDial:      cancel,
+		dialer:            dialer,
+	}
+}
+
+// Read implements net.Conn.Read()
+func (c *connection) Read(b []byte) (int, error) {
+	if c.shouldWait {
+		<-c.delayedDialFinish.Done()
+		if c.conn == nil {
+			return 0, newError("unable to read delayed dial websocket connection as it do not exist")
+		}
+	}
+
+	if c.reader != nil {
+		n, err := c.reader.Read(b)
+		if err == io.EOF {
+			c.reader = nil
+			return c.conn.Read(b)
+		}
+		return n, err
+	}
+	return c.conn.Read(b)
+}
+
+// Write implements io.Writer.
+func (c *connection) Write(b []byte) (int, error) {
+	if c.shouldWait {
+		var err error
+		var earlyReply io.Reader
+		c.conn, earlyReply, err = c.dialer(b)
+		if earlyReply != nil {
+			c.reader = earlyReply
+		}
+		c.finishedDial()
+		if err != nil {
+			return 0, newError("Unable to proceed with delayed write").Base(err)
+		}
+		c.remoteAddr = c.conn.RemoteAddr()
+		c.shouldWait = false
+		return len(b), nil
+	}
+	return c.conn.Write(b)
+}
+
+func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
+	mb = buf.Compact(mb)
+	mb, err := buf.WriteMultiBuffer(c, mb)
+	buf.ReleaseMulti(mb)
+	return err
+}
+
+func (c *connection) Close() error {
+	if c.shouldWait {
+		<-c.delayedDialFinish.Done()
+		if c.conn == nil {
+			return newError("unable to close delayed dial websocket connection as it do not exist")
+		}
+	}
+	var closeErrors []interface{}
+	if err := c.conn.Close(); err != nil {
+		closeErrors = append(closeErrors, err)
+	}
+	if len(closeErrors) > 0 {
+		return newError("failed to close connection").Base(newError(serial.Concat(closeErrors...)))
+	}
+	return nil
+}
+
+func (c *connection) LocalAddr() net.Addr {
+	if c.shouldWait {
+		<-c.delayedDialFinish.Done()
+		if c.conn == nil {
+			newError("websocket transport is not materialized when LocalAddr() is called").AtWarning().WriteToLog()
+			return &net.UnixAddr{
+				Name: "@placeholder",
+				Net:  "unix",
+			}
+		}
+	}
+	return c.conn.LocalAddr()
+}
+
+func (c *connection) RemoteAddr() net.Addr {
+	return c.remoteAddr
+}
+
+func (c *connection) SetDeadline(t time.Time) error {
+	if err := c.SetReadDeadline(t); err != nil {
+		return err
+	}
+	return c.SetWriteDeadline(t)
+}
+
+func (c *connection) SetReadDeadline(t time.Time) error {
+	if c.shouldWait {
+		<-c.delayedDialFinish.Done()
+		if c.conn == nil {
+			newError("httpupgrade transport is not materialized when SetReadDeadline() is called").AtWarning().WriteToLog()
+			return nil
+		}
+	}
+	return c.conn.SetReadDeadline(t)
+}
+
+func (c *connection) SetWriteDeadline(t time.Time) error {
+	if c.shouldWait {
+		<-c.delayedDialFinish.Done()
+		if c.conn == nil {
+			newError("httpupgrade transport is not materialized when SetWriteDeadline() is called").AtWarning().WriteToLog()
+			return nil
+		}
+	}
+	return c.conn.SetWriteDeadline(t)
+}

+ 68 - 24
transport/internet/httpupgrade/dialer.go

@@ -3,6 +3,8 @@ package httpupgrade
 import (
 	"bufio"
 	"context"
+	"encoding/base64"
+	"io"
 	"net/http"
 	"strings"
 
@@ -16,36 +18,78 @@ import (
 func dialhttpUpgrade(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
 	transportConfiguration := streamSettings.ProtocolSettings.(*Config)
 
-	conn, err := transportcommon.DialWithSecuritySettings(ctx, dest, streamSettings)
-	if err != nil {
-		return nil, newError("failed to dial request to ", dest).Base(err)
-	}
-	req, err := http.NewRequest("GET", transportConfiguration.GetNormalizedPath(), nil)
-	if err != nil {
-		return nil, err
-	}
+	dialer := func(earlyData []byte) (net.Conn, io.Reader, error) {
+		conn, err := transportcommon.DialWithSecuritySettings(ctx, dest, streamSettings)
+		if err != nil {
+			return nil, nil, newError("failed to dial request to ", dest).Base(err)
+		}
+		req, err := http.NewRequest("GET", transportConfiguration.GetNormalizedPath(), nil)
+		if err != nil {
+			return nil, nil, err
+		}
 
-	req.Header.Set("Connection", "upgrade")
-	req.Header.Set("Upgrade", "websocket")
-	req.Host = transportConfiguration.Host
+		req.Header.Set("Connection", "upgrade")
+		req.Header.Set("Upgrade", "websocket")
+		req.Host = transportConfiguration.Host
 
-	err = req.Write(conn)
-	if err != nil {
-		return nil, err
-	}
+		if transportConfiguration.Header != nil {
+			for _, value := range transportConfiguration.Header {
+				req.Header.Set(value.Key, value.Value)
+			}
+		}
 
-	// TODO The bufio usage here is unreliable
-	resp, err := http.ReadResponse(bufio.NewReader(conn), req) // nolint:bodyclose
-	if err != nil {
-		return nil, err
+		earlyDataSize := len(earlyData)
+		if earlyDataSize > int(transportConfiguration.MaxEarlyData) {
+			earlyDataSize = int(transportConfiguration.MaxEarlyData)
+		}
+
+		if earlyData != nil && len(earlyData) > 0 {
+			if transportConfiguration.EarlyDataHeaderName == "" {
+				return nil, nil, newError("EarlyDataHeaderName is not set")
+			}
+			req.Header.Set(transportConfiguration.EarlyDataHeaderName, base64.URLEncoding.EncodeToString(earlyData))
+		}
+
+		err = req.Write(conn)
+		if err != nil {
+			return nil, nil, err
+		}
+
+		if earlyData != nil && len(earlyData[earlyDataSize:]) > 0 {
+			_, err = conn.Write(earlyData[earlyDataSize:])
+			if err != nil {
+				return nil, nil, newError("failed to finish write early data").Base(err)
+			}
+		}
+
+		bufferedConn := bufio.NewReader(conn)
+		resp, err := http.ReadResponse(bufferedConn, req) // nolint:bodyclose
+		if err != nil {
+			return nil, nil, err
+		}
+
+		if resp.Status == "101 Switching Protocols" &&
+			strings.ToLower(resp.Header.Get("Upgrade")) == "websocket" &&
+			strings.ToLower(resp.Header.Get("Connection")) == "upgrade" {
+
+			earlyReplyReader := io.LimitReader(bufferedConn, int64(bufferedConn.Buffered()))
+			return conn, earlyReplyReader, nil
+		}
+
+		return nil, nil, newError("unrecognized reply")
 	}
 
-	if resp.Status == "101 Switching Protocols" &&
-		strings.ToLower(resp.Header.Get("Upgrade")) == "websocket" &&
-		strings.ToLower(resp.Header.Get("Connection")) == "upgrade" {
-		return conn, nil
+	if transportConfiguration.MaxEarlyData == 0 {
+		conn, earlyReplyReader, err := dialer(nil)
+		if err != nil {
+			return nil, err
+		}
+		remoteAddr := conn.RemoteAddr()
+
+		return newConnectionWithPendingRead(conn, remoteAddr, earlyReplyReader), nil
 	}
-	return nil, newError("unrecognized reply")
+
+	return newConnectionWithDelayedDial(dialer), nil
 }
 
 func dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {

+ 18 - 2
transport/internet/httpupgrade/hub.go

@@ -2,7 +2,9 @@ package httpupgrade
 
 import (
 	"bufio"
+	"bytes"
 	"context"
+	"encoding/base64"
 	"net/http"
 	"strings"
 
@@ -13,6 +15,8 @@ import (
 )
 
 type server struct {
+	config *Config
+
 	addConn        internet.ConnHandler
 	innnerListener net.Listener
 }
@@ -52,6 +56,19 @@ func (s *server) Handle(conn net.Conn) (internet.Connection, error) {
 		_ = conn.Close()
 		return nil, err
 	}
+	if s.config.MaxEarlyData != 0 {
+		if s.config.EarlyDataHeaderName == "" {
+			return nil, newError("EarlyDataHeaderName is not set")
+		}
+		earlyData := req.Header.Get(s.config.EarlyDataHeaderName)
+		if earlyData != "" {
+			earlyDataBytes, err := base64.URLEncoding.DecodeString(earlyData)
+			if err != nil {
+				return nil, err
+			}
+			return newConnectionWithPendingRead(conn, conn.RemoteAddr(), bytes.NewReader(earlyDataBytes)), nil
+		}
+	}
 	return internet.Connection(conn), nil
 }
 
@@ -72,8 +89,7 @@ func (s *server) keepAccepting() {
 
 func listenHTTPUpgrade(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
 	transportConfiguration := streamSettings.ProtocolSettings.(*Config)
-	_ = transportConfiguration
-	serverInstance := &server{addConn: addConn}
+	serverInstance := &server{config: transportConfiguration, addConn: addConn}
 
 	listener, err := transportcommon.ListenWithSecuritySettings(ctx, address, port, streamSettings)
 	if err != nil {