Ver Fonte

massive refactoring for kcp

v2ray há 9 anos atrás
pai
commit
9b6dc6bcea
55 ficheiros alterados com 1041 adições e 1051 exclusões
  1. 3 3
      app/dns/nameserver.go
  2. 11 1
      app/dns/server_test.go
  3. 3 0
      common/net/network.go
  4. 12 4
      proxy/blackhole/blackhole.go
  5. 22 12
      proxy/dokodemo/dokodemo.go
  6. 28 4
      proxy/dokodemo/dokodemo_test.go
  7. 5 3
      proxy/dokodemo/sockopt_linux.go
  8. 2 2
      proxy/dokodemo/sockopt_other.go
  9. 16 9
      proxy/freedom/freedom.go
  10. 25 3
      proxy/freedom/freedom_test.go
  11. 21 19
      proxy/http/server.go
  12. 12 1
      proxy/http/server_test.go
  13. 10 2
      proxy/internal/creator.go
  14. 26 10
      proxy/internal/handler_cache.go
  15. 8 9
      proxy/proxy.go
  16. 26 18
      proxy/shadowsocks/server.go
  17. 25 17
      proxy/socks/server.go
  18. 0 266
      proxy/socks/server_test.go
  19. 3 3
      proxy/socks/server_udp.go
  20. 2 2
      proxy/testing/proxy.go
  21. 36 36
      proxy/vmess/inbound/inbound.go
  22. 23 21
      proxy/vmess/outbound/outbound.go
  23. 0 129
      proxy/vmess/vmess_test.go
  24. 4 0
      release/server/main.go
  25. 22 17
      shell/point/config.go
  26. 35 17
      shell/point/config_json.go
  27. 4 3
      shell/point/inbound_detour_always.go
  28. 5 4
      shell/point/inbound_detour_dynamic.go
  29. 12 7
      shell/point/point.go
  30. 10 17
      transport/config.go
  31. 7 21
      transport/config_json.go
  32. 0 102
      transport/hub/dialer.go
  33. 0 31
      transport/hub/kcp_test.go
  34. 0 3
      transport/hub/kcpv/config_json.go
  35. 0 21
      transport/hub/kcpv/crypto.go
  36. 0 116
      transport/hub/tcp.go
  37. 33 0
      transport/internet/connection.go
  38. 27 0
      transport/internet/connection_json.go
  39. 63 0
      transport/internet/dialer.go
  40. 4 17
      transport/internet/dialer_test.go
  41. 26 23
      transport/internet/kcp/config.go
  42. 27 0
      transport/internet/kcp/config_json.go
  43. 26 0
      transport/internet/kcp/dialer.go
  44. 33 47
      transport/internet/kcp/session.go
  45. 15 0
      transport/internet/tcp/config.go
  46. 20 0
      transport/internet/tcp/config_json.go
  47. 31 10
      transport/internet/tcp/connection.go
  48. 1 1
      transport/internet/tcp/connection_cache.go
  49. 49 0
      transport/internet/tcp/dialer.go
  50. 159 0
      transport/internet/tcp/hub.go
  51. 77 0
      transport/internet/tcp_hub.go
  52. 30 0
      transport/internet/udp/connection.go
  53. 1 1
      transport/internet/udp/udp.go
  54. 1 1
      transport/internet/udp/udp_server.go
  55. 0 18
      transport/transport.go

+ 3 - 3
app/dns/nameserver.go

@@ -10,7 +10,7 @@ import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet/udp"
 
 	"github.com/miekg/dns"
 )
@@ -43,7 +43,7 @@ type UDPNameServer struct {
 	sync.Mutex
 	address     v2net.Destination
 	requests    map[uint16]*PendingRequest
-	udpServer   *hub.UDPServer
+	udpServer   *udp.UDPServer
 	nextCleanup time.Time
 }
 
@@ -51,7 +51,7 @@ func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDis
 	s := &UDPNameServer{
 		address:   address,
 		requests:  make(map[uint16]*PendingRequest),
-		udpServer: hub.NewUDPServer(dispatcher),
+		udpServer: udp.NewUDPServer(dispatcher),
 	}
 	return s
 }

+ 11 - 1
app/dns/server_test.go

@@ -13,6 +13,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/freedom"
 	"github.com/v2ray/v2ray-core/testing/assert"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 func TestDnsAdd(t *testing.T) {
@@ -21,7 +22,16 @@ func TestDnsAdd(t *testing.T) {
 	space := app.NewSpace()
 
 	outboundHandlerManager := proxyman.NewDefaultOutboundHandlerManager()
-	outboundHandlerManager.SetDefaultHandler(freedom.NewFreedomConnection(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP}))
+	outboundHandlerManager.SetDefaultHandler(
+		freedom.NewFreedomConnection(
+			&freedom.Config{},
+			space,
+			&proxy.OutboundHandlerMeta{
+				Address: v2net.AnyIP,
+				StreamSettings: &internet.StreamSettings{
+					Type: internet.StreamConnectionTypeRawTCP,
+				},
+			}))
 	space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager)
 	space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
 

+ 3 - 0
common/net/network.go

@@ -12,6 +12,9 @@ const (
 
 	// UDPNetwork represents the UDP network.
 	UDPNetwork = Network("udp")
+
+	// KCPNetwork represents the KCP network.
+	KCPNetwork = Network("kcp")
 )
 
 // Network represents a communication network on internet.

+ 12 - 4
proxy/blackhole/blackhole.go

@@ -6,6 +6,7 @@ import (
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
+	"github.com/v2ray/v2ray-core/transport/internet"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -33,9 +34,16 @@ func (this *BlackHole) Dispatch(destination v2net.Destination, payload *alloc.Bu
 	return nil
 }
 
+type Factory struct{}
+
+func (this *Factory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *Factory) Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
+	return NewBlackHole(space, config.(*Config), meta), nil
+}
+
 func init() {
-	internal.MustRegisterOutboundHandlerCreator("blackhole",
-		func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
-			return NewBlackHole(space, config.(*Config), meta), nil
-		})
+	internal.MustRegisterOutboundHandlerCreator("blackhole", new(Factory))
 }

+ 22 - 12
proxy/dokodemo/dokodemo.go

@@ -11,7 +11,8 @@ import (
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
+	"github.com/v2ray/v2ray-core/transport/internet/udp"
 )
 
 type DokodemoDoor struct {
@@ -22,9 +23,9 @@ type DokodemoDoor struct {
 	address          v2net.Address
 	port             v2net.Port
 	packetDispatcher dispatcher.PacketDispatcher
-	tcpListener      *hub.TCPHub
-	udpHub           *hub.UDPHub
-	udpServer        *hub.UDPServer
+	tcpListener      *internet.TCPHub
+	udpHub           *udp.UDPHub
+	udpServer        *udp.UDPServer
 	meta             *proxy.InboundHandlerMeta
 }
 
@@ -88,8 +89,8 @@ func (this *DokodemoDoor) Start() error {
 }
 
 func (this *DokodemoDoor) ListenUDP() error {
-	this.udpServer = hub.NewUDPServer(this.packetDispatcher)
-	udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets)
+	this.udpServer = udp.NewUDPServer(this.packetDispatcher)
+	udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets)
 	if err != nil {
 		log.Error("Dokodemo failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -115,7 +116,8 @@ func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *all
 }
 
 func (this *DokodemoDoor) ListenTCP() error {
-	tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.HandleTCPConnection, nil)
+	log.Info("Dokodemo: Stream settings: ", this.meta.StreamSettings)
+	tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.HandleTCPConnection, this.meta.StreamSettings)
 	if err != nil {
 		log.Error("Dokodemo: Failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -126,7 +128,7 @@ func (this *DokodemoDoor) ListenTCP() error {
 	return nil
 }
 
-func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
+func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
 	defer conn.Close()
 
 	var dest v2net.Destination
@@ -145,6 +147,7 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
 		log.Info("Dokodemo: Unknown destination, stop forwarding...")
 		return
 	}
+	log.Info("Dokodemo: Handling request to ", dest)
 
 	ray := this.packetDispatcher.DispatchToOutbound(dest)
 	defer ray.InboundOutput().Release()
@@ -177,9 +180,16 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
 	inputFinish.Lock()
 }
 
+type Factory struct{}
+
+func (this *Factory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
+	return NewDokodemoDoor(rawConfig.(*Config), space, meta), nil
+}
+
 func init() {
-	internal.MustRegisterInboundHandlerCreator("dokodemo-door",
-		func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			return NewDokodemoDoor(rawConfig.(*Config), space, meta), nil
-		})
+	internal.MustRegisterInboundHandlerCreator("dokodemo-door", new(Factory))
 }

+ 28 - 4
proxy/dokodemo/dokodemo_test.go

@@ -16,6 +16,7 @@ import (
 	"github.com/v2ray/v2ray-core/testing/assert"
 	"github.com/v2ray/v2ray-core/testing/servers/tcp"
 	"github.com/v2ray/v2ray-core/testing/servers/udp"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 func TestDokodemoTCP(t *testing.T) {
@@ -40,7 +41,14 @@ func TestDokodemoTCP(t *testing.T) {
 	ohm := proxyman.NewDefaultOutboundHandlerManager()
 	ohm.SetDefaultHandler(
 		freedom.NewFreedomConnection(
-			&freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.LocalHostIP}))
+			&freedom.Config{},
+			space,
+			&proxy.OutboundHandlerMeta{
+				Address: v2net.LocalHostIP,
+				StreamSettings: &internet.StreamSettings{
+					Type: internet.StreamConnectionTypeRawTCP,
+				},
+			}))
 	space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm)
 
 	data2Send := "Data to be sent to remote."
@@ -51,7 +59,12 @@ func TestDokodemoTCP(t *testing.T) {
 		Port:    tcpServer.Port,
 		Network: v2net.TCPNetwork.AsList(),
 		Timeout: 600,
-	}, space, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port})
+	}, space, &proxy.InboundHandlerMeta{
+		Address: v2net.LocalHostIP,
+		Port:    port,
+		StreamSettings: &internet.StreamSettings{
+			Type: internet.StreamConnectionTypeRawTCP,
+		}})
 	defer dokodemo.Close()
 
 	assert.Error(space.Initialize()).IsNil()
@@ -100,7 +113,13 @@ func TestDokodemoUDP(t *testing.T) {
 	ohm := proxyman.NewDefaultOutboundHandlerManager()
 	ohm.SetDefaultHandler(
 		freedom.NewFreedomConnection(
-			&freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP}))
+			&freedom.Config{},
+			space,
+			&proxy.OutboundHandlerMeta{
+				Address: v2net.AnyIP,
+				StreamSettings: &internet.StreamSettings{
+					Type: internet.StreamConnectionTypeRawTCP,
+				}}))
 	space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm)
 
 	data2Send := "Data to be sent to remote."
@@ -111,7 +130,12 @@ func TestDokodemoUDP(t *testing.T) {
 		Port:    udpServer.Port,
 		Network: v2net.UDPNetwork.AsList(),
 		Timeout: 600,
-	}, space, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port})
+	}, space, &proxy.InboundHandlerMeta{
+		Address: v2net.LocalHostIP,
+		Port:    port,
+		StreamSettings: &internet.StreamSettings{
+			Type: internet.StreamConnectionTypeRawTCP,
+		}})
 	defer dokodemo.Close()
 
 	assert.Error(space.Initialize()).IsNil()

+ 5 - 3
proxy/dokodemo/sockopt_linux.go

@@ -7,13 +7,15 @@ import (
 
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
+	"github.com/v2ray/v2ray-core/transport/internet/tcp"
 )
 
 const SO_ORIGINAL_DST = 80
 
-func GetOriginalDestination(conn *hub.Connection) v2net.Destination {
-	fd, err := conn.SysFd()
+func GetOriginalDestination(conn internet.Connection) v2net.Destination {
+	tcpConn := conn.(*tcp.Connection)
+	fd, err := tcpConn.SysFd()
 	if err != nil {
 		log.Info("Dokodemo: Failed to get original destination: ", err)
 		return nil

+ 2 - 2
proxy/dokodemo/sockopt_other.go

@@ -4,9 +4,9 @@ package dokodemo
 
 import (
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
-func GetOriginalDestination(conn *hub.Connection) v2net.Destination {
+func GetOriginalDestination(conn internet.Connection) v2net.Destination {
 	return nil
 }

+ 16 - 9
proxy/freedom/freedom.go

@@ -2,7 +2,6 @@ package freedom
 
 import (
 	"io"
-	"net"
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
@@ -15,7 +14,8 @@ import (
 	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
+	"github.com/v2ray/v2ray-core/transport/internet/tcp"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -75,12 +75,12 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 	defer ray.OutboundInput().Release()
 	defer ray.OutboundOutput().Close()
 
-	var conn net.Conn
+	var conn internet.Connection
 	if this.domainStrategy == DomainStrategyUseIP && destination.Address().IsDomain() {
 		destination = this.ResolveIP(destination)
 	}
 	err := retry.Timed(5, 100).On(func() error {
-		rawConn, err := hub.DialWithoutCache(this.meta.Address, destination)
+		rawConn, err := internet.Dial(this.meta.Address, destination, this.meta.StreamSettings)
 		if err != nil {
 			return err
 		}
@@ -130,7 +130,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 	}()
 
 	writeMutex.Lock()
-	if tcpConn, ok := conn.(*net.TCPConn); ok {
+	if tcpConn, ok := conn.(*tcp.RawConnection); ok {
 		tcpConn.CloseWrite()
 	}
 	readMutex.Lock()
@@ -138,9 +138,16 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 	return nil
 }
 
+type FreedomFactory struct{}
+
+func (this *FreedomFactory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *FreedomFactory) Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
+	return NewFreedomConnection(config.(*Config), space, meta), nil
+}
+
 func init() {
-	internal.MustRegisterOutboundHandlerCreator("freedom",
-		func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
-			return NewFreedomConnection(config.(*Config), space, meta), nil
-		})
+	internal.MustRegisterOutboundHandlerCreator("freedom", new(FreedomFactory))
 }

+ 25 - 3
proxy/freedom/freedom_test.go

@@ -18,6 +18,7 @@ import (
 	. "github.com/v2ray/v2ray-core/proxy/freedom"
 	"github.com/v2ray/v2ray-core/testing/assert"
 	"github.com/v2ray/v2ray-core/testing/servers/tcp"
+	"github.com/v2ray/v2ray-core/transport/internet"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -38,7 +39,15 @@ func TestSinglePacket(t *testing.T) {
 	assert.Error(err).IsNil()
 
 	space := app.NewSpace()
-	freedom := NewFreedomConnection(&Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP})
+	freedom := NewFreedomConnection(
+		&Config{},
+		space,
+		&proxy.OutboundHandlerMeta{
+			Address: v2net.AnyIP,
+			StreamSettings: &internet.StreamSettings{
+				Type: internet.StreamConnectionTypeRawTCP,
+			},
+		})
 	space.Initialize()
 
 	traffic := ray.NewRay()
@@ -58,7 +67,15 @@ func TestSinglePacket(t *testing.T) {
 func TestUnreachableDestination(t *testing.T) {
 	assert := assert.On(t)
 
-	freedom := NewFreedomConnection(&Config{}, app.NewSpace(), &proxy.OutboundHandlerMeta{Address: v2net.AnyIP})
+	freedom := NewFreedomConnection(
+		&Config{},
+		app.NewSpace(),
+		&proxy.OutboundHandlerMeta{
+			Address: v2net.AnyIP,
+			StreamSettings: &internet.StreamSettings{
+				Type: internet.StreamConnectionTypeRawTCP,
+			},
+		})
 	traffic := ray.NewRay()
 	data2Send := "Data to be sent to remote"
 	payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send))
@@ -85,7 +102,12 @@ func TestIPResolution(t *testing.T) {
 	freedom := NewFreedomConnection(
 		&Config{DomainStrategy: DomainStrategyUseIP},
 		space,
-		&proxy.OutboundHandlerMeta{Address: v2net.AnyIP})
+		&proxy.OutboundHandlerMeta{
+			Address: v2net.AnyIP,
+			StreamSettings: &internet.StreamSettings{
+				Type: internet.StreamConnectionTypeRawTCP,
+			},
+		})
 
 	space.Initialize()
 

+ 21 - 19
proxy/http/server.go

@@ -2,7 +2,6 @@ package http
 
 import (
 	"bufio"
-	"crypto/tls"
 	"io"
 	"net"
 	"net/http"
@@ -18,7 +17,7 @@ import (
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -28,7 +27,7 @@ type Server struct {
 	accepting        bool
 	packetDispatcher dispatcher.PacketDispatcher
 	config           *Config
-	tcpListener      *hub.TCPHub
+	tcpListener      *internet.TCPHub
 	meta             *proxy.InboundHandlerMeta
 }
 
@@ -59,11 +58,7 @@ func (this *Server) Start() error {
 		return nil
 	}
 
-	var tlsConfig *tls.Config
-	if this.config.TLSConfig != nil {
-		tlsConfig = this.config.TLSConfig.GetConfig()
-	}
-	tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, tlsConfig)
+	tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, this.meta.StreamSettings)
 	if err != nil {
 		log.Error("HTTP: Failed listen on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -98,7 +93,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
 	return v2net.TCPDestination(v2net.DomainAddress(host), port), nil
 }
 
-func (this *Server) handleConnection(conn *hub.Connection) {
+func (this *Server) handleConnection(conn internet.Connection) {
 	defer conn.Close()
 	reader := bufio.NewReader(conn)
 
@@ -269,15 +264,22 @@ func (this *Server) handlePlainHTTP(request *http.Request, dest v2net.Destinatio
 	finish.Wait()
 }
 
+type ServerFactory struct{}
+
+func (this *ServerFactory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
+	if !space.HasApp(dispatcher.APP_ID) {
+		return nil, internal.ErrorBadConfiguration
+	}
+	return NewServer(
+		rawConfig.(*Config),
+		space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
+		meta), nil
+}
+
 func init() {
-	internal.MustRegisterInboundHandlerCreator("http",
-		func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			if !space.HasApp(dispatcher.APP_ID) {
-				return nil, internal.ErrorBadConfiguration
-			}
-			return NewServer(
-				rawConfig.(*Config),
-				space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
-				meta), nil
-		})
+	internal.MustRegisterInboundHandlerCreator("http", new(ServerFactory))
 }

+ 12 - 1
proxy/http/server_test.go

@@ -12,6 +12,9 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	. "github.com/v2ray/v2ray-core/proxy/http"
 	"github.com/v2ray/v2ray-core/testing/assert"
+	"github.com/v2ray/v2ray-core/transport/internet"
+
+	_ "github.com/v2ray/v2ray-core/transport/internet/tcp"
 )
 
 func TestHopByHopHeadersStrip(t *testing.T) {
@@ -54,7 +57,15 @@ func TestNormalGetRequest(t *testing.T) {
 	testPacketDispatcher := testdispatcher.NewTestPacketDispatcher(nil)
 
 	port := v2nettesting.PickPort()
-	httpProxy := NewServer(&Config{}, testPacketDispatcher, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port})
+	httpProxy := NewServer(
+		&Config{},
+		testPacketDispatcher,
+		&proxy.InboundHandlerMeta{
+			Address: v2net.LocalHostIP,
+			Port:    port,
+			StreamSettings: &internet.StreamSettings{
+				Type: internet.StreamConnectionTypeRawTCP,
+			}})
 	defer httpProxy.Close()
 
 	err := httpProxy.Start()

+ 10 - 2
proxy/internal/creator.go

@@ -3,7 +3,15 @@ package internal
 import (
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
-type InboundHandlerCreator func(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error)
-type OutboundHandlerCreator func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error)
+type InboundHandlerFactory interface {
+	StreamCapability() internet.StreamConnectionType
+	Create(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error)
+}
+
+type OutboundHandlerFactory interface {
+	StreamCapability() internet.StreamConnectionType
+	Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error)
+}

+ 26 - 10
proxy/internal/handler_cache.go

@@ -5,18 +5,19 @@ import (
 
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 var (
-	inboundFactories  = make(map[string]InboundHandlerCreator)
-	outboundFactories = make(map[string]OutboundHandlerCreator)
+	inboundFactories  = make(map[string]InboundHandlerFactory)
+	outboundFactories = make(map[string]OutboundHandlerFactory)
 
 	ErrorProxyNotFound    = errors.New("Proxy not found.")
 	ErrorNameExists       = errors.New("Proxy with the same name already exists.")
 	ErrorBadConfiguration = errors.New("Bad proxy configuration.")
 )
 
-func RegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) error {
+func RegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) error {
 	if _, found := inboundFactories[name]; found {
 		return ErrorNameExists
 	}
@@ -24,13 +25,13 @@ func RegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) e
 	return nil
 }
 
-func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) {
+func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) {
 	if err := RegisterInboundHandlerCreator(name, creator); err != nil {
 		panic(err)
 	}
 }
 
-func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator) error {
+func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) error {
 	if _, found := outboundFactories[name]; found {
 		return ErrorNameExists
 	}
@@ -38,7 +39,7 @@ func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator)
 	return nil
 }
 
-func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator) {
+func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) {
 	if err := RegisterOutboundHandlerCreator(name, creator); err != nil {
 		panic(err)
 	}
@@ -49,14 +50,22 @@ func CreateInboundHandler(name string, space app.Space, rawConfig []byte, meta *
 	if !found {
 		return nil, ErrorProxyNotFound
 	}
+	if meta.StreamSettings == nil {
+		meta.StreamSettings = &internet.StreamSettings{
+			Type: creator.StreamCapability(),
+		}
+	} else {
+		meta.StreamSettings.Type &= creator.StreamCapability()
+	}
+
 	if len(rawConfig) > 0 {
 		proxyConfig, err := CreateInboundConfig(name, rawConfig)
 		if err != nil {
 			return nil, err
 		}
-		return creator(space, proxyConfig, meta)
+		return creator.Create(space, proxyConfig, meta)
 	}
-	return creator(space, nil, meta)
+	return creator.Create(space, nil, meta)
 }
 
 func CreateOutboundHandler(name string, space app.Space, rawConfig []byte, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
@@ -64,14 +73,21 @@ func CreateOutboundHandler(name string, space app.Space, rawConfig []byte, meta
 	if !found {
 		return nil, ErrorProxyNotFound
 	}
+	if meta.StreamSettings == nil {
+		meta.StreamSettings = &internet.StreamSettings{
+			Type: creator.StreamCapability(),
+		}
+	} else {
+		meta.StreamSettings.Type &= creator.StreamCapability()
+	}
 
 	if len(rawConfig) > 0 {
 		proxyConfig, err := CreateOutboundConfig(name, rawConfig)
 		if err != nil {
 			return nil, err
 		}
-		return creator(space, proxyConfig, meta)
+		return creator.Create(space, proxyConfig, meta)
 	}
 
-	return creator(space, nil, meta)
+	return creator.Create(space, nil, meta)
 }

+ 8 - 9
proxy/proxy.go

@@ -4,6 +4,7 @@ package proxy // import "github.com/v2ray/v2ray-core/proxy"
 import (
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/internet"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -15,18 +16,16 @@ const (
 )
 
 type InboundHandlerMeta struct {
-	Tag     string
-	Address v2net.Address
-	Port    v2net.Port
-	//Whether this proxy support KCP connections
-	KcpSupported bool
+	Tag            string
+	Address        v2net.Address
+	Port           v2net.Port
+	StreamSettings *internet.StreamSettings
 }
 
 type OutboundHandlerMeta struct {
-	Tag     string
-	Address v2net.Address
-	//Whether this proxy support KCP connections
-	KcpSupported bool
+	Tag            string
+	Address        v2net.Address
+	StreamSettings *internet.StreamSettings
 }
 
 // An InboundHandler handles inbound network connections to V2Ray.

+ 26 - 18
proxy/shadowsocks/server.go

@@ -16,7 +16,8 @@ import (
 	"github.com/v2ray/v2ray-core/common/protocol"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
+	"github.com/v2ray/v2ray-core/transport/internet/udp"
 )
 
 type Server struct {
@@ -24,9 +25,9 @@ type Server struct {
 	config           *Config
 	meta             *proxy.InboundHandlerMeta
 	accepting        bool
-	tcpHub           *hub.TCPHub
-	udpHub           *hub.UDPHub
-	udpServer        *hub.UDPServer
+	tcpHub           *internet.TCPHub
+	udpHub           *udp.UDPHub
+	udpServer        *udp.UDPServer
 }
 
 func NewServer(config *Config, packetDispatcher dispatcher.PacketDispatcher, meta *proxy.InboundHandlerMeta) *Server {
@@ -61,7 +62,7 @@ func (this *Server) Start() error {
 		return nil
 	}
 
-	tcpHub, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, nil)
+	tcpHub, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, this.meta.StreamSettings)
 	if err != nil {
 		log.Error("Shadowsocks: Failed to listen TCP on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -69,8 +70,8 @@ func (this *Server) Start() error {
 	this.tcpHub = tcpHub
 
 	if this.config.UDP {
-		this.udpServer = hub.NewUDPServer(this.packetDispatcher)
-		udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload)
+		this.udpServer = udp.NewUDPServer(this.packetDispatcher)
+		udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload)
 		if err != nil {
 			log.Error("Shadowsocks: Failed to listen UDP on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 			return err
@@ -154,7 +155,7 @@ func (this *Server) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destin
 	})
 }
 
-func (this *Server) handleConnection(conn *hub.Connection) {
+func (this *Server) handleConnection(conn internet.Connection) {
 	defer conn.Close()
 
 	buffer := alloc.NewSmallBuffer()
@@ -248,15 +249,22 @@ func (this *Server) handleConnection(conn *hub.Connection) {
 	writeFinish.Lock()
 }
 
+type ServerFactory struct{}
+
+func (this *ServerFactory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
+	if !space.HasApp(dispatcher.APP_ID) {
+		return nil, internal.ErrorBadConfiguration
+	}
+	return NewServer(
+		rawConfig.(*Config),
+		space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
+		meta), nil
+}
+
 func init() {
-	internal.MustRegisterInboundHandlerCreator("shadowsocks",
-		func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			if !space.HasApp(dispatcher.APP_ID) {
-				return nil, internal.ErrorBadConfiguration
-			}
-			return NewServer(
-				rawConfig.(*Config),
-				space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
-				meta), nil
-		})
+	internal.MustRegisterInboundHandlerCreator("shadowsocks", new(ServerFactory))
 }

+ 25 - 17
proxy/socks/server.go

@@ -14,7 +14,8 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	"github.com/v2ray/v2ray-core/proxy/socks/protocol"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
+	"github.com/v2ray/v2ray-core/transport/internet/udp"
 )
 
 var (
@@ -29,10 +30,10 @@ type Server struct {
 	accepting        bool
 	packetDispatcher dispatcher.PacketDispatcher
 	config           *Config
-	tcpListener      *hub.TCPHub
-	udpHub           *hub.UDPHub
+	tcpListener      *internet.TCPHub
+	udpHub           *udp.UDPHub
 	udpAddress       v2net.Destination
-	udpServer        *hub.UDPServer
+	udpServer        *udp.UDPServer
 	meta             *proxy.InboundHandlerMeta
 }
 
@@ -73,11 +74,11 @@ func (this *Server) Start() error {
 		return nil
 	}
 
-	listener, err := hub.ListenTCP(
+	listener, err := internet.ListenTCP(
 		this.meta.Address,
 		this.meta.Port,
 		this.handleConnection,
-		nil)
+		this.meta.StreamSettings)
 	if err != nil {
 		log.Error("Socks: failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -92,7 +93,7 @@ func (this *Server) Start() error {
 	return nil
 }
 
-func (this *Server) handleConnection(connection *hub.Connection) {
+func (this *Server) handleConnection(connection internet.Connection) {
 	defer connection.Close()
 
 	timedReader := v2net.NewTimeOutReader(120, connection)
@@ -302,15 +303,22 @@ func (this *Server) transport(reader io.Reader, writer io.Writer, destination v2
 	outputFinish.Lock()
 }
 
+type ServerFactory struct{}
+
+func (this *ServerFactory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP
+}
+
+func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
+	if !space.HasApp(dispatcher.APP_ID) {
+		return nil, internal.ErrorBadConfiguration
+	}
+	return NewServer(
+		rawConfig.(*Config),
+		space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
+		meta), nil
+}
+
 func init() {
-	internal.MustRegisterInboundHandlerCreator("socks",
-		func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			if !space.HasApp(dispatcher.APP_ID) {
-				return nil, internal.ErrorBadConfiguration
-			}
-			return NewServer(
-				rawConfig.(*Config),
-				space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
-				meta), nil
-		})
+	internal.MustRegisterInboundHandlerCreator("socks", new(ServerFactory))
 }

+ 0 - 266
proxy/socks/server_test.go

@@ -1,266 +0,0 @@
-package socks_test
-
-import (
-	"bytes"
-	"fmt"
-	"io/ioutil"
-	"net"
-	"testing"
-
-	"golang.org/x/net/proxy"
-
-	"github.com/v2ray/v2ray-core/app"
-	"github.com/v2ray/v2ray-core/app/dns"
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
-	v2proxy "github.com/v2ray/v2ray-core/proxy"
-	proxytesting "github.com/v2ray/v2ray-core/proxy/testing"
-	proxymocks "github.com/v2ray/v2ray-core/proxy/testing/mocks"
-	"github.com/v2ray/v2ray-core/shell/point"
-	"github.com/v2ray/v2ray-core/testing/assert"
-)
-
-func TestSocksTcpConnect(t *testing.T) {
-	assert := assert.On(t)
-	port := v2nettesting.PickPort()
-
-	connInput := []byte("The data to be returned to socks server.")
-	connOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	och := &proxymocks.OutboundConnectionHandler{
-		ConnOutput: connOutput,
-		ConnInput:  bytes.NewReader(connInput),
-	}
-
-	protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och",
-		func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) {
-			return och, nil
-		})
-	assert.Error(err).IsNil()
-
-	config := &point.Config{
-		Port: port,
-		InboundConfig: &point.InboundConnectionConfig{
-			Protocol: "socks",
-			ListenOn: v2net.LocalHostIP,
-			Settings: []byte(`
-      {
-        "auth": "noauth"
-      }`),
-		},
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: protocol,
-			Settings: nil,
-		},
-	}
-
-	point, err := point.NewPoint(config)
-	assert.Error(err).IsNil()
-
-	err = point.Start()
-	assert.Error(err).IsNil()
-
-	socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), nil, proxy.Direct)
-	assert.Error(err).IsNil()
-
-	targetServer := "google.com:80"
-	conn, err := socks5Client.Dial("tcp", targetServer)
-	assert.Error(err).IsNil()
-
-	data2Send := "The data to be sent to remote server."
-	conn.Write([]byte(data2Send))
-	if tcpConn, ok := conn.(*net.TCPConn); ok {
-		tcpConn.CloseWrite()
-	}
-
-	dataReturned, err := ioutil.ReadAll(conn)
-	assert.Error(err).IsNil()
-	conn.Close()
-
-	assert.Bytes([]byte(data2Send)).Equals(connOutput.Bytes())
-	assert.Bytes(dataReturned).Equals(connInput)
-	assert.String(targetServer).Equals(och.Destination.NetAddr())
-}
-
-func TestSocksTcpConnectWithUserPass(t *testing.T) {
-	assert := assert.On(t)
-	port := v2nettesting.PickPort()
-
-	connInput := []byte("The data to be returned to socks server.")
-	connOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	och := &proxymocks.OutboundConnectionHandler{
-		ConnInput:  bytes.NewReader(connInput),
-		ConnOutput: connOutput,
-	}
-
-	protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och",
-		func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) {
-			return och, nil
-		})
-	assert.Error(err).IsNil()
-
-	config := &point.Config{
-		Port: port,
-		InboundConfig: &point.InboundConnectionConfig{
-			Protocol: "socks",
-			ListenOn: v2net.LocalHostIP,
-			Settings: []byte(`
-      {
-        "auth": "password",
-        "accounts": [
-          {"user": "userx", "pass": "passy"}
-        ]
-      }`),
-		},
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: protocol,
-			Settings: nil,
-		},
-	}
-
-	point, err := point.NewPoint(config)
-	assert.Error(err).IsNil()
-
-	err = point.Start()
-	assert.Error(err).IsNil()
-
-	socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), &proxy.Auth{User: "userx", Password: "passy"}, proxy.Direct)
-	assert.Error(err).IsNil()
-
-	targetServer := "1.2.3.4:443"
-	conn, err := socks5Client.Dial("tcp", targetServer)
-	assert.Error(err).IsNil()
-
-	data2Send := "The data to be sent to remote server."
-	conn.Write([]byte(data2Send))
-	if tcpConn, ok := conn.(*net.TCPConn); ok {
-		tcpConn.CloseWrite()
-	}
-
-	dataReturned, err := ioutil.ReadAll(conn)
-	assert.Error(err).IsNil()
-	conn.Close()
-
-	assert.Bytes([]byte(data2Send)).Equals(connOutput.Bytes())
-	assert.Bytes(dataReturned).Equals(connInput)
-	assert.String(targetServer).Equals(och.Destination.NetAddr())
-}
-
-func TestSocksTcpConnectWithWrongUserPass(t *testing.T) {
-	assert := assert.On(t)
-	port := v2nettesting.PickPort()
-
-	connInput := []byte("The data to be returned to socks server.")
-	connOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	och := &proxymocks.OutboundConnectionHandler{
-		ConnInput:  bytes.NewReader(connInput),
-		ConnOutput: connOutput,
-	}
-
-	protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och",
-		func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) {
-			return och, nil
-		})
-	assert.Error(err).IsNil()
-
-	config := &point.Config{
-		Port: port,
-		InboundConfig: &point.InboundConnectionConfig{
-			Protocol: "socks",
-			ListenOn: v2net.LocalHostIP,
-			Settings: []byte(`
-      {
-        "auth": "password",
-        "accounts": [
-          {"user": "userx", "pass": "passy"}
-        ]
-      }`),
-		},
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: protocol,
-			Settings: nil,
-		},
-	}
-
-	point, err := point.NewPoint(config)
-	assert.Error(err).IsNil()
-
-	err = point.Start()
-	assert.Error(err).IsNil()
-
-	socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), &proxy.Auth{User: "userx", Password: "passz"}, proxy.Direct)
-	assert.Error(err).IsNil()
-
-	targetServer := "1.2.3.4:443"
-	_, err = socks5Client.Dial("tcp", targetServer)
-	assert.Error(err).IsNotNil()
-}
-
-func TestSocksTcpConnectWithWrongAuthMethod(t *testing.T) {
-	assert := assert.On(t)
-	port := v2nettesting.PickPort()
-
-	connInput := []byte("The data to be returned to socks server.")
-	connOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	och := &proxymocks.OutboundConnectionHandler{
-		ConnInput:  bytes.NewReader(connInput),
-		ConnOutput: connOutput,
-	}
-
-	protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och",
-		func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) {
-			return och, nil
-		})
-	assert.Error(err).IsNil()
-
-	config := &point.Config{
-		Port: port,
-		InboundConfig: &point.InboundConnectionConfig{
-			ListenOn: v2net.LocalHostIP,
-			Protocol: "socks",
-			Settings: []byte(`
-      {
-        "auth": "password",
-        "accounts": [
-          {"user": "userx", "pass": "passy"}
-        ]
-      }`),
-		},
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: protocol,
-			Settings: nil,
-		},
-	}
-
-	point, err := point.NewPoint(config)
-	assert.Error(err).IsNil()
-
-	err = point.Start()
-	assert.Error(err).IsNil()
-
-	socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), nil, proxy.Direct)
-	assert.Error(err).IsNil()
-
-	targetServer := "1.2.3.4:443"
-	_, err = socks5Client.Dial("tcp", targetServer)
-	assert.Error(err).IsNotNil()
-}

+ 3 - 3
proxy/socks/server_udp.go

@@ -5,12 +5,12 @@ import (
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/proxy/socks/protocol"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet/udp"
 )
 
 func (this *Server) listenUDP() error {
-	this.udpServer = hub.NewUDPServer(this.packetDispatcher)
-	udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload)
+	this.udpServer = udp.NewUDPServer(this.packetDispatcher)
+	udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload)
 	if err != nil {
 		log.Error("Socks: Failed to listen on udp ", this.meta.Address, ":", this.meta.Port)
 		return err

+ 2 - 2
proxy/testing/proxy.go

@@ -13,7 +13,7 @@ func randomString() string {
 	return fmt.Sprintf("-%d", count)
 }
 
-func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.InboundHandlerCreator) (string, error) {
+func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.InboundHandlerFactory) (string, error) {
 	for {
 		name := prefix + randomString()
 		err := internal.RegisterInboundHandlerCreator(name, creator)
@@ -23,7 +23,7 @@ func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.Inb
 	}
 }
 
-func RegisterOutboundConnectionHandlerCreator(prefix string, creator internal.OutboundHandlerCreator) (string, error) {
+func RegisterOutboundConnectionHandlerCreator(prefix string, creator internal.OutboundHandlerFactory) (string, error) {
 	for {
 		name := prefix + randomString()
 		err := internal.RegisterOutboundHandlerCreator(name, creator)

+ 36 - 36
proxy/vmess/inbound/inbound.go

@@ -17,8 +17,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
-	"github.com/v2ray/v2ray-core/transport"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 type userByEmail struct {
@@ -72,7 +71,7 @@ type VMessInboundHandler struct {
 	clients               protocol.UserValidator
 	usersByEmail          *userByEmail
 	accepting             bool
-	listener              *hub.TCPHub
+	listener              *internet.TCPHub
 	detours               *DetourConfig
 	meta                  *proxy.InboundHandlerMeta
 }
@@ -106,7 +105,7 @@ func (this *VMessInboundHandler) Start() error {
 		return nil
 	}
 
-	tcpListener, err := hub.ListenTCP6(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta, nil)
+	tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta.StreamSettings)
 	if err != nil {
 		log.Error("Unable to listen tcp ", this.meta.Address, ":", this.meta.Port, ": ", err)
 		return err
@@ -118,7 +117,7 @@ func (this *VMessInboundHandler) Start() error {
 	return nil
 }
 
-func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
+func (this *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 	defer connection.Close()
 
 	connReader := v2net.NewTimeOutReader(8, connection)
@@ -140,11 +139,9 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 		return
 	}
 	log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
-	log.Debug("VMessIn: Received request for ", request.Destination())
+	log.Info("VMessIn: Received request for ", request.Destination())
 
-	if request.Option.Has(protocol.RequestOptionConnectionReuse) {
-		connection.SetReusable(true)
-	}
+	connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
 
 	ray := this.packetDispatcher.DispatchToOutbound(request.Destination())
 	input := ray.InboundInput()
@@ -184,7 +181,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 		Command: this.generateCommand(request),
 	}
 
-	if request.Option.Has(protocol.RequestOptionConnectionReuse) && transport.IsConnectionReusable() {
+	if connection.Reusable() {
 		response.Option.Set(protocol.ResponseOptionConnectionReuse)
 	}
 
@@ -220,36 +217,39 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
 
 	readFinish.Lock()
 }
-func (this *VMessInboundHandler) setProxyCap() {
-	this.meta.KcpSupported = true
+
+type Factory struct{}
+
+func (this *Factory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP
 }
-func init() {
-	internal.MustRegisterInboundHandlerCreator("vmess",
-		func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			if !space.HasApp(dispatcher.APP_ID) {
-				return nil, internal.ErrorBadConfiguration
-			}
-			config := rawConfig.(*Config)
 
-			allowedClients := protocol.NewTimedUserValidator(protocol.DefaultIDHash)
-			for _, user := range config.AllowedUsers {
-				allowedClients.Add(user)
-			}
+func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
+	if !space.HasApp(dispatcher.APP_ID) {
+		return nil, internal.ErrorBadConfiguration
+	}
+	config := rawConfig.(*Config)
 
-			handler := &VMessInboundHandler{
-				packetDispatcher: space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
-				clients:          allowedClients,
-				detours:          config.DetourConfig,
-				usersByEmail:     NewUserByEmail(config.AllowedUsers, config.Defaults),
-				meta:             meta,
-			}
+	allowedClients := protocol.NewTimedUserValidator(protocol.DefaultIDHash)
+	for _, user := range config.AllowedUsers {
+		allowedClients.Add(user)
+	}
 
-			if space.HasApp(proxyman.APP_ID_INBOUND_MANAGER) {
-				handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager)
-			}
+	handler := &VMessInboundHandler{
+		packetDispatcher: space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
+		clients:          allowedClients,
+		detours:          config.DetourConfig,
+		usersByEmail:     NewUserByEmail(config.AllowedUsers, config.Defaults),
+		meta:             meta,
+	}
 
-			handler.setProxyCap()
+	if space.HasApp(proxyman.APP_ID_INBOUND_MANAGER) {
+		handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager)
+	}
 
-			return handler, nil
-		})
+	return handler, nil
+}
+
+func init() {
+	internal.MustRegisterInboundHandlerCreator("vmess", new(Factory))
 }

+ 23 - 21
proxy/vmess/outbound/outbound.go

@@ -15,8 +15,7 @@ import (
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
-	"github.com/v2ray/v2ray-core/transport"
-	"github.com/v2ray/v2ray-core/transport/hub"
+	"github.com/v2ray/v2ray-core/transport/internet"
 	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
@@ -30,11 +29,11 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	defer ray.OutboundOutput().Close()
 
 	var rec *Receiver
-	var conn *hub.Connection
+	var conn internet.Connection
 
 	err := retry.Timed(5, 100).On(func() error {
 		rec = this.receiverManager.PickReceiver()
-		rawConn, err := hub.Dial3(this.meta.Address, rec.Destination, this.meta)
+		rawConn, err := internet.Dial(this.meta.Address, rec.Destination, this.meta.StreamSettings)
 		if err != nil {
 			return err
 		}
@@ -63,9 +62,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 
 	defer conn.Close()
 
-	if transport.IsConnectionReusable() {
+	conn.SetReusable(true)
+	if conn.Reusable() { // Conn reuse may be disabled on transportation layer
 		request.Option.Set(protocol.RequestOptionConnectionReuse)
-		conn.SetReusable(true)
 	}
 
 	input := ray.OutboundInput()
@@ -85,7 +84,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	return nil
 }
 
-func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
+func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
 	defer finish.Unlock()
 
 	writer := v2io.NewBufferedWriter(conn)
@@ -117,7 +116,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
 	return
 }
 
-func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
+func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
 	defer finish.Unlock()
 
 	reader := v2io.NewBufferedReader(conn)
@@ -154,21 +153,24 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
 
 	return
 }
-func (this *VMessOutboundHandler) setProxyCap() {
-	this.meta.KcpSupported = true
+
+type Factory struct{}
+
+func (this *Factory) StreamCapability() internet.StreamConnectionType {
+	return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP
 }
-func init() {
-	internal.MustRegisterOutboundHandlerCreator("vmess",
-		func(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
-			vOutConfig := rawConfig.(*Config)
 
-			handler := &VMessOutboundHandler{
-				receiverManager: NewReceiverManager(vOutConfig.Receivers),
-				meta:            meta,
-			}
+func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
+	vOutConfig := rawConfig.(*Config)
 
-			handler.setProxyCap()
+	handler := &VMessOutboundHandler{
+		receiverManager: NewReceiverManager(vOutConfig.Receivers),
+		meta:            meta,
+	}
 
-			return handler, nil
-		})
+	return handler, nil
+}
+
+func init() {
+	internal.MustRegisterOutboundHandlerCreator("vmess", new(Factory))
 }

+ 0 - 129
proxy/vmess/vmess_test.go

@@ -1,129 +0,0 @@
-package vmess_test
-
-import (
-	"bytes"
-	"testing"
-
-	"github.com/v2ray/v2ray-core/app"
-	"github.com/v2ray/v2ray-core/app/dispatcher"
-	"github.com/v2ray/v2ray-core/app/dns"
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
-	"github.com/v2ray/v2ray-core/common/protocol"
-	"github.com/v2ray/v2ray-core/common/uuid"
-	"github.com/v2ray/v2ray-core/proxy"
-	proxytesting "github.com/v2ray/v2ray-core/proxy/testing"
-	proxymocks "github.com/v2ray/v2ray-core/proxy/testing/mocks"
-	_ "github.com/v2ray/v2ray-core/proxy/vmess/inbound"
-	_ "github.com/v2ray/v2ray-core/proxy/vmess/outbound"
-	"github.com/v2ray/v2ray-core/shell/point"
-	"github.com/v2ray/v2ray-core/testing/assert"
-)
-
-func TestVMessInAndOut(t *testing.T) {
-	assert := assert.On(t)
-
-	id, err := uuid.ParseString("ad937d9d-6e23-4a5a-ba23-bce5092a7c51")
-	assert.Error(err).IsNil()
-
-	testAccount := protocol.NewID(id)
-
-	portA := v2nettesting.PickPort()
-	portB := v2nettesting.PickPort()
-
-	ichConnInput := []byte("The data to be send to outbound server.")
-	ichConnOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	ich := &proxymocks.InboundConnectionHandler{
-		ConnInput:  bytes.NewReader(ichConnInput),
-		ConnOutput: ichConnOutput,
-	}
-
-	protocol, err := proxytesting.RegisterInboundConnectionHandlerCreator("mock_ich",
-		func(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
-			ich.ListeningAddress = meta.Address
-			ich.ListeningPort = meta.Port
-			ich.PacketDispatcher = space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
-			return ich, nil
-		})
-	assert.Error(err).IsNil()
-
-	configA := &point.Config{
-		Port: portA,
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		InboundConfig: &point.InboundConnectionConfig{
-			Protocol: protocol,
-			ListenOn: v2net.LocalHostIP,
-			Settings: nil,
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: "vmess",
-			Settings: []byte(`{
-        "vnext": [
-          {
-            "address": "127.0.0.1",
-            "port": ` + portB.String() + `,
-            "users": [
-              {"id": "` + testAccount.String() + `"}
-            ]
-          }
-        ]
-      }`),
-		},
-	}
-
-	pointA, err := point.NewPoint(configA)
-	assert.Error(err).IsNil()
-
-	err = pointA.Start()
-	assert.Error(err).IsNil()
-
-	ochConnInput := []byte("The data to be returned to inbound server.")
-	ochConnOutput := bytes.NewBuffer(make([]byte, 0, 1024))
-	och := &proxymocks.OutboundConnectionHandler{
-		ConnInput:  bytes.NewReader(ochConnInput),
-		ConnOutput: ochConnOutput,
-	}
-
-	protocol, err = proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och",
-		func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) {
-			return och, nil
-		})
-	assert.Error(err).IsNil()
-
-	configB := &point.Config{
-		Port: portB,
-		DNSConfig: &dns.Config{
-			NameServers: []v2net.Destination{
-				v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
-			},
-		},
-		InboundConfig: &point.InboundConnectionConfig{
-			Protocol: "vmess",
-			ListenOn: v2net.LocalHostIP,
-			Settings: []byte(`{
-        "clients": [
-          {"id": "` + testAccount.String() + `"}
-        ]
-      }`),
-		},
-		OutboundConfig: &point.OutboundConnectionConfig{
-			Protocol: protocol,
-			Settings: nil,
-		},
-	}
-
-	pointB, err := point.NewPoint(configB)
-	assert.Error(err).IsNil()
-
-	err = pointB.Start()
-	assert.Error(err).IsNil()
-
-	dest := v2net.TCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}), 80)
-	ich.Communicate(dest)
-	assert.Bytes(ichConnInput).Equals(ochConnOutput.Bytes())
-	assert.Bytes(ichConnOutput.Bytes()).Equals(ochConnInput)
-}

+ 4 - 0
release/server/main.go

@@ -21,6 +21,10 @@ import (
 	_ "github.com/v2ray/v2ray-core/proxy/socks"
 	_ "github.com/v2ray/v2ray-core/proxy/vmess/inbound"
 	_ "github.com/v2ray/v2ray-core/proxy/vmess/outbound"
+
+	_ "github.com/v2ray/v2ray-core/transport/internet/kcp"
+	_ "github.com/v2ray/v2ray-core/transport/internet/tcp"
+	_ "github.com/v2ray/v2ray-core/transport/internet/udp"
 )
 
 var (

+ 22 - 17
shell/point/config.go

@@ -6,19 +6,22 @@ import (
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/transport"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 type InboundConnectionConfig struct {
-	Port     v2net.Port
-	ListenOn v2net.Address
-	Protocol string
-	Settings []byte
+	Port           v2net.Port
+	ListenOn       v2net.Address
+	StreamSettings *internet.StreamSettings
+	Protocol       string
+	Settings       []byte
 }
 
 type OutboundConnectionConfig struct {
-	Protocol    string
-	SendThrough v2net.Address
-	Settings    []byte
+	Protocol       string
+	SendThrough    v2net.Address
+	StreamSettings *internet.StreamSettings
+	Settings       []byte
 }
 
 type LogConfig struct {
@@ -40,19 +43,21 @@ type InboundDetourAllocationConfig struct {
 }
 
 type InboundDetourConfig struct {
-	Protocol   string
-	PortRange  v2net.PortRange
-	ListenOn   v2net.Address
-	Tag        string
-	Allocation *InboundDetourAllocationConfig
-	Settings   []byte
+	Protocol       string
+	PortRange      v2net.PortRange
+	ListenOn       v2net.Address
+	Tag            string
+	Allocation     *InboundDetourAllocationConfig
+	StreamSettings *internet.StreamSettings
+	Settings       []byte
 }
 
 type OutboundDetourConfig struct {
-	Protocol    string
-	SendThrough v2net.Address
-	Tag         string
-	Settings    []byte
+	Protocol       string
+	SendThrough    v2net.Address
+	StreamSettings *internet.StreamSettings
+	Tag            string
+	Settings       []byte
 }
 
 type Config struct {

+ 35 - 17
shell/point/config_json.go

@@ -14,6 +14,7 @@ import (
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/transport"
+	"github.com/v2ray/v2ray-core/transport/internet"
 )
 
 const (
@@ -57,10 +58,11 @@ func (this *Config) UnmarshalJSON(data []byte) error {
 
 func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error {
 	type JsonConfig struct {
-		Port     uint16             `json:"port"`
-		Listen   *v2net.AddressJson `json:"listen"`
-		Protocol string             `json:"protocol"`
-		Settings json.RawMessage    `json:"settings"`
+		Port          uint16                   `json:"port"`
+		Listen        *v2net.AddressJson       `json:"listen"`
+		Protocol      string                   `json:"protocol"`
+		StreamSetting *internet.StreamSettings `json:"streamSettings"`
+		Settings      json.RawMessage          `json:"settings"`
 	}
 
 	jsonConfig := new(JsonConfig)
@@ -75,6 +77,9 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error {
 		}
 		this.ListenOn = jsonConfig.Listen.Address
 	}
+	if jsonConfig.StreamSetting != nil {
+		this.StreamSettings = jsonConfig.StreamSetting
+	}
 
 	this.Protocol = jsonConfig.Protocol
 	this.Settings = jsonConfig.Settings
@@ -83,9 +88,10 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error {
 
 func (this *OutboundConnectionConfig) UnmarshalJSON(data []byte) error {
 	type JsonConnectionConfig struct {
-		Protocol    string             `json:"protocol"`
-		SendThrough *v2net.AddressJson `json:"sendThrough"`
-		Settings    json.RawMessage    `json:"settings"`
+		Protocol      string                   `json:"protocol"`
+		SendThrough   *v2net.AddressJson       `json:"sendThrough"`
+		StreamSetting *internet.StreamSettings `json:"streamSettings"`
+		Settings      json.RawMessage          `json:"settings"`
 	}
 	jsonConfig := new(JsonConnectionConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -101,6 +107,9 @@ func (this *OutboundConnectionConfig) UnmarshalJSON(data []byte) error {
 		}
 		this.SendThrough = address
 	}
+	if jsonConfig.StreamSetting != nil {
+		this.StreamSettings = jsonConfig.StreamSetting
+	}
 	return nil
 }
 
@@ -162,12 +171,13 @@ func (this *InboundDetourAllocationConfig) UnmarshalJSON(data []byte) error {
 
 func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error {
 	type JsonInboundDetourConfig struct {
-		Protocol   string                         `json:"protocol"`
-		PortRange  *v2net.PortRange               `json:"port"`
-		ListenOn   *v2net.AddressJson             `json:"listen"`
-		Settings   json.RawMessage                `json:"settings"`
-		Tag        string                         `json:"tag"`
-		Allocation *InboundDetourAllocationConfig `json:"allocate"`
+		Protocol      string                         `json:"protocol"`
+		PortRange     *v2net.PortRange               `json:"port"`
+		ListenOn      *v2net.AddressJson             `json:"listen"`
+		Settings      json.RawMessage                `json:"settings"`
+		Tag           string                         `json:"tag"`
+		Allocation    *InboundDetourAllocationConfig `json:"allocate"`
+		StreamSetting *internet.StreamSettings       `json:"streamSettings"`
 	}
 	jsonConfig := new(JsonInboundDetourConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -195,15 +205,19 @@ func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error {
 			Refresh:  DefaultRefreshMinute,
 		}
 	}
+	if jsonConfig.StreamSetting != nil {
+		this.StreamSettings = jsonConfig.StreamSetting
+	}
 	return nil
 }
 
 func (this *OutboundDetourConfig) UnmarshalJSON(data []byte) error {
 	type JsonOutboundDetourConfig struct {
-		Protocol    string             `json:"protocol"`
-		SendThrough *v2net.AddressJson `json:"sendThrough"`
-		Tag         string             `json:"tag"`
-		Settings    json.RawMessage    `json:"settings"`
+		Protocol      string                   `json:"protocol"`
+		SendThrough   *v2net.AddressJson       `json:"sendThrough"`
+		Tag           string                   `json:"tag"`
+		Settings      json.RawMessage          `json:"settings"`
+		StreamSetting *internet.StreamSettings `json:"streamSettings"`
 	}
 	jsonConfig := new(JsonOutboundDetourConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -220,6 +234,10 @@ func (this *OutboundDetourConfig) UnmarshalJSON(data []byte) error {
 		}
 		this.SendThrough = address
 	}
+
+	if jsonConfig.StreamSetting != nil {
+		this.StreamSettings = jsonConfig.StreamSetting
+	}
 	return nil
 }
 

+ 4 - 3
shell/point/inbound_detour_always.go

@@ -26,9 +26,10 @@ func NewInboundDetourHandlerAlways(space app.Space, config *InboundDetourConfig)
 	for i := ports.From; i <= ports.To; i++ {
 		ichConfig := config.Settings
 		ich, err := proxyrepo.CreateInboundHandler(config.Protocol, space, ichConfig, &proxy.InboundHandlerMeta{
-			Address: config.ListenOn,
-			Port:    i,
-			Tag:     config.Tag})
+			Address:        config.ListenOn,
+			Port:           i,
+			Tag:            config.Tag,
+			StreamSettings: config.StreamSettings})
 		if err != nil {
 			log.Error("Failed to create inbound connection handler: ", err)
 			return nil, err

+ 5 - 4
shell/point/inbound_detour_dynamic.go

@@ -32,9 +32,10 @@ func NewInboundDetourHandlerDynamic(space app.Space, config *InboundDetourConfig
 
 	// To test configuration
 	ich, err := proxyrepo.CreateInboundHandler(config.Protocol, space, config.Settings, &proxy.InboundHandlerMeta{
-		Address: config.ListenOn,
-		Port:    0,
-		Tag:     config.Tag})
+		Address:        config.ListenOn,
+		Port:           0,
+		Tag:            config.Tag,
+		StreamSettings: config.StreamSettings})
 	if err != nil {
 		log.Error("Point: Failed to create inbound connection handler: ", err)
 		return nil, err
@@ -99,7 +100,7 @@ func (this *InboundDetourHandlerDynamic) refresh() error {
 	for idx, _ := range newIchs {
 		port := this.pickUnusedPort()
 		ich, err := proxyrepo.CreateInboundHandler(config.Protocol, this.space, config.Settings, &proxy.InboundHandlerMeta{
-			Address: config.ListenOn, Port: port, Tag: config.Tag})
+			Address: config.ListenOn, Port: port, Tag: config.Tag, StreamSettings: config.StreamSettings})
 		if err != nil {
 			log.Error("Point: Failed to create inbound connection handler: ", err)
 			return err

+ 12 - 7
shell/point/point.go

@@ -93,9 +93,10 @@ func NewPoint(pConfig *Config) (*Point, error) {
 	ichConfig := pConfig.InboundConfig.Settings
 	ich, err := proxyrepo.CreateInboundHandler(
 		pConfig.InboundConfig.Protocol, vpoint.space, ichConfig, &proxy.InboundHandlerMeta{
-			Tag:     "system.inbound",
-			Address: pConfig.InboundConfig.ListenOn,
-			Port:    vpoint.port})
+			Tag:            "system.inbound",
+			Address:        pConfig.InboundConfig.ListenOn,
+			Port:           vpoint.port,
+			StreamSettings: pConfig.InboundConfig.StreamSettings})
 	if err != nil {
 		log.Error("Failed to create inbound connection handler: ", err)
 		return nil, err
@@ -105,8 +106,10 @@ func NewPoint(pConfig *Config) (*Point, error) {
 	ochConfig := pConfig.OutboundConfig.Settings
 	och, err := proxyrepo.CreateOutboundHandler(
 		pConfig.OutboundConfig.Protocol, vpoint.space, ochConfig, &proxy.OutboundHandlerMeta{
-			Tag:     "system.outbound",
-			Address: pConfig.OutboundConfig.SendThrough})
+			Tag:            "system.outbound",
+			Address:        pConfig.OutboundConfig.SendThrough,
+			StreamSettings: pConfig.OutboundConfig.StreamSettings,
+		})
 	if err != nil {
 		log.Error("Failed to create outbound connection handler: ", err)
 		return nil, err
@@ -153,8 +156,10 @@ func NewPoint(pConfig *Config) (*Point, error) {
 		for _, detourConfig := range outboundDetours {
 			detourHandler, err := proxyrepo.CreateOutboundHandler(
 				detourConfig.Protocol, vpoint.space, detourConfig.Settings, &proxy.OutboundHandlerMeta{
-					Tag:     detourConfig.Tag,
-					Address: detourConfig.SendThrough})
+					Tag:            detourConfig.Tag,
+					Address:        detourConfig.SendThrough,
+					StreamSettings: detourConfig.StreamSettings,
+				})
 			if err != nil {
 				log.Error("Point: Failed to create detour outbound connection handler: ", err)
 				return nil, err

+ 10 - 17
transport/config.go

@@ -1,30 +1,23 @@
 package transport
 
-import "github.com/v2ray/v2ray-core/transport/hub/kcpv"
+import (
+	"github.com/v2ray/v2ray-core/transport/internet/kcp"
+	"github.com/v2ray/v2ray-core/transport/internet/tcp"
+)
 
 // Config for V2Ray transport layer.
 type Config struct {
-	ConnectionReuse bool
-	enableKcp       bool
-	kcpConfig       *kcpv.Config
+	tcpConfig *tcp.Config
+	kcpConfig *kcp.Config
 }
 
 // Apply applies this Config.
 func (this *Config) Apply() error {
-	if this.ConnectionReuse {
-		connectionReuse = true
+	if this.tcpConfig != nil {
+		this.tcpConfig.Apply()
 	}
-	enableKcp = this.enableKcp
-	if enableKcp {
-		KcpConfig = this.kcpConfig
-		/*
-			KCP do not support connectionReuse,
-			it is mandatory to set connectionReuse to false
-			Since KCP have no handshake and
-			does not SlowStart, there isn't benefit to
-			use that anyway.
-		*/
-		connectionReuse = false
+	if this.kcpConfig != nil {
+		this.kcpConfig.Apply()
 	}
 	return nil
 }

+ 7 - 21
transport/config_json.go

@@ -5,35 +5,21 @@ package transport
 import (
 	"encoding/json"
 
-	"github.com/v2ray/v2ray-core/common/log"
-	"github.com/v2ray/v2ray-core/transport/hub/kcpv"
+	"github.com/v2ray/v2ray-core/transport/internet/kcp"
+	"github.com/v2ray/v2ray-core/transport/internet/tcp"
 )
 
 func (this *Config) UnmarshalJSON(data []byte) error {
 	type JsonConfig struct {
-		ConnectionReuse bool         `json:"connectionReuse"`
-		EnableKcp       bool         `json:"EnableKCP,omitempty"`
-		KcpConfig       *kcpv.Config `json:"KcpConfig,omitempty"`
-	}
-	jsonConfig := &JsonConfig{
-		ConnectionReuse: true,
-		EnableKcp:       false,
+		TCPConfig *tcp.Config `json:"tcpSettings"`
+		KCPCOnfig *kcp.Config `json:"kcpSettings"`
 	}
+	jsonConfig := new(JsonConfig)
 	if err := json.Unmarshal(data, jsonConfig); err != nil {
 		return err
 	}
-	this.ConnectionReuse = jsonConfig.ConnectionReuse
-	this.enableKcp = jsonConfig.EnableKcp
-	if jsonConfig.KcpConfig != nil {
-		this.kcpConfig = jsonConfig.KcpConfig
-		if jsonConfig.KcpConfig.AdvancedConfigs == nil {
-			jsonConfig.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs
-		}
-	} else {
-		if jsonConfig.EnableKcp {
-			log.Error("transport: You have enabled KCP but no configure is given")
-		}
-	}
+	this.tcpConfig = jsonConfig.TCPConfig
+	this.kcpConfig = jsonConfig.KCPCOnfig
 
 	return nil
 }

+ 0 - 102
transport/hub/dialer.go

@@ -1,102 +0,0 @@
-package hub
-
-import (
-	"errors"
-	"net"
-	"time"
-
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/proxy"
-	"github.com/v2ray/v2ray-core/transport"
-)
-
-var (
-	ErrorInvalidHost = errors.New("Invalid Host.")
-
-	globalCache = NewConnectionCache()
-)
-
-func Dial(src v2net.Address, dest v2net.Destination) (*Connection, error) {
-	if src == nil {
-		src = v2net.AnyIP
-	}
-	id := src.String() + "-" + dest.NetAddr()
-	var conn net.Conn
-	if dest.IsTCP() && transport.IsConnectionReusable() {
-		conn = globalCache.Get(id)
-	}
-	if conn == nil {
-		var err error
-		conn, err = DialWithoutCache(src, dest)
-		if err != nil {
-			return nil, err
-		}
-	}
-	return &Connection{
-		dest:     id,
-		conn:     conn,
-		listener: globalCache,
-	}, nil
-}
-
-func DialWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) {
-	dialer := &net.Dialer{
-		Timeout:   time.Second * 60,
-		DualStack: true,
-	}
-
-	if src != nil && src != v2net.AnyIP {
-		var addr net.Addr
-		if dest.IsTCP() {
-			addr = &net.TCPAddr{
-				IP:   src.IP(),
-				Port: 0,
-			}
-		} else {
-			addr = &net.UDPAddr{
-				IP:   src.IP(),
-				Port: 0,
-			}
-		}
-		dialer.LocalAddr = addr
-	}
-
-	return dialer.Dial(dest.Network().String(), dest.NetAddr())
-}
-
-func Dial3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) {
-	if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
-		return DialKCP3(src, dest, proxyMeta)
-	}
-	return Dial(src, dest)
-}
-func DialWithoutCache3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (net.Conn, error) {
-	if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
-		return DialKCPWithoutCache(src, dest)
-	}
-	return DialWithoutCache(src, dest)
-}
-
-func DialKCP3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) {
-	if src == nil {
-		src = v2net.AnyIP
-	}
-	id := src.String() + "-" + dest.NetAddr()
-	conn, err := DialWithoutCache3(src, dest, proxyMeta)
-	if err != nil {
-		return nil, err
-	}
-	return &Connection{
-		dest:     id,
-		conn:     conn,
-		listener: globalCache,
-	}, nil
-}
-
-/*DialKCPWithoutCache Dial KCP connection
-This Dialer will ignore src this is a restriction
-due to github.com/xtaci/kcp-go.DialWithOptions
-*/
-func DialKCPWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) {
-	return DialKCP(dest)
-}

+ 0 - 31
transport/hub/kcp_test.go

@@ -1,31 +0,0 @@
-package hub_test
-
-import "testing"
-
-import (
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/testing/assert"
-	"github.com/v2ray/v2ray-core/transport"
-	"github.com/v2ray/v2ray-core/transport/hub"
-	"github.com/v2ray/v2ray-core/transport/hub/kcpv"
-)
-
-func Test_Pair(t *testing.T) {
-	assert := assert.On(t)
-	transport.KcpConfig = &kcpv.Config{}
-	transport.KcpConfig.Mode = "fast2"
-	transport.KcpConfig.Key = "key"
-	transport.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs
-	lst, _ := hub.ListenKCP(v2net.ParseAddress("127.0.0.1"), 17777)
-	go func() {
-		connx, err2 := lst.Accept()
-		assert.Error(err2).IsNil()
-		connx.Close()
-	}()
-	conn, _ := hub.DialKCP(v2net.TCPDestination(v2net.ParseAddress("127.0.0.1"), 17777))
-	conn.LocalAddr()
-	conn.RemoteAddr()
-	conn.ApplyConf()
-	conn.Write([]byte("x"))
-	conn.Close()
-}

+ 0 - 3
transport/hub/kcpv/config_json.go

@@ -1,3 +0,0 @@
-package kcpv
-
-//We can use the default version of json parser

+ 0 - 21
transport/hub/kcpv/crypto.go

@@ -1,21 +0,0 @@
-package kcpv
-
-import (
-	"crypto/aes"
-	"crypto/cipher"
-	"crypto/sha256"
-)
-
-func generateKeyFromConfigString(key string) []byte {
-	key += "consensus salt: Let's fight arcifical deceleration with our code. We shall prove our believes with action."
-	keyw := sha256.Sum256([]byte(key))
-	return keyw[:]
-}
-
-func generateBlockWithKey(key []byte) (cipher.Block, error) {
-	return aes.NewCipher(key)
-}
-
-func GetChipher(key string) (cipher.Block, error) {
-	return generateBlockWithKey(generateKeyFromConfigString(key))
-}

+ 0 - 116
transport/hub/tcp.go

@@ -1,116 +0,0 @@
-package hub
-
-import (
-	"crypto/tls"
-	"errors"
-	"net"
-	"sync"
-
-	"github.com/v2ray/v2ray-core/common/log"
-	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/proxy"
-	"github.com/v2ray/v2ray-core/transport"
-)
-
-var (
-	ErrorClosedConnection = errors.New("Connection already closed.")
-)
-
-type TCPHub struct {
-	sync.Mutex
-	listener     net.Listener
-	connCallback ConnectionHandler
-	accepting    bool
-}
-
-func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
-	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
-		IP:   address.IP(),
-		Port: int(port),
-		Zone: "",
-	})
-	if err != nil {
-		return nil, err
-	}
-	var hub *TCPHub
-	if tlsConfig != nil {
-		tlsListener := tls.NewListener(listener, tlsConfig)
-		hub = &TCPHub{
-			listener:     tlsListener,
-			connCallback: callback,
-		}
-	} else {
-		hub = &TCPHub{
-			listener:     listener,
-			connCallback: callback,
-		}
-	}
-
-	go hub.start()
-	return hub, nil
-}
-func ListenKCPhub(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
-	listener, err := ListenKCP(address, port)
-	if err != nil {
-		return nil, err
-	}
-	var hub *TCPHub
-	if tlsConfig != nil {
-		tlsListener := tls.NewListener(listener, tlsConfig)
-		hub = &TCPHub{
-			listener:     tlsListener,
-			connCallback: callback,
-		}
-	} else {
-		hub = &TCPHub{
-			listener:     listener,
-			connCallback: callback,
-		}
-	}
-
-	go hub.start()
-	return hub, nil
-}
-func ListenTCP6(address v2net.Address, port v2net.Port, callback ConnectionHandler, proxyMeta *proxy.InboundHandlerMeta, tlsConfig *tls.Config) (*TCPHub, error) {
-	if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
-		return ListenKCPhub(address, port, callback, tlsConfig)
-	} else {
-		return ListenTCP(address, port, callback, tlsConfig)
-	}
-	return nil, errors.New("ListenTCP6: Not Implemented")
-}
-
-func (this *TCPHub) Close() {
-	this.accepting = false
-	this.listener.Close()
-}
-
-func (this *TCPHub) start() {
-	this.accepting = true
-	for this.accepting {
-		conn, err := this.listener.Accept()
-
-		if err != nil {
-			if this.accepting {
-				log.Warning("Listener: Failed to accept new TCP connection: ", err)
-			}
-			continue
-		}
-		go this.connCallback(&Connection{
-			dest:     conn.RemoteAddr().String(),
-			conn:     conn,
-			listener: this,
-		})
-	}
-}
-
-// @Private
-func (this *TCPHub) Recycle(dest string, conn net.Conn) {
-	if this.accepting {
-		go this.connCallback(&Connection{
-			dest:     dest,
-			conn:     conn,
-			listener: this,
-		})
-	}
-}

+ 33 - 0
transport/internet/connection.go

@@ -0,0 +1,33 @@
+package internet
+
+import (
+	"net"
+)
+
+type ConnectionHandler func(Connection)
+
+type Reusable interface {
+	Reusable() bool
+	SetReusable(reuse bool)
+}
+
+type StreamConnectionType int
+
+var (
+	StreamConnectionTypeRawTCP StreamConnectionType = 1
+	StreamConnectionTypeTCP    StreamConnectionType = 2
+	StreamConnectionTypeKCP    StreamConnectionType = 4
+)
+
+type StreamSettings struct {
+	Type StreamConnectionType
+}
+
+func (this *StreamSettings) IsCapableOf(streamType StreamConnectionType) bool {
+	return (this.Type & streamType) == streamType
+}
+
+type Connection interface {
+	net.Conn
+	Reusable
+}

+ 27 - 0
transport/internet/connection_json.go

@@ -0,0 +1,27 @@
+// +build json
+
+package internet
+
+import (
+	"encoding/json"
+
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+func (this *StreamSettings) UnmarshalJSON(data []byte) error {
+	type JSONConfig struct {
+		Network v2net.NetworkList `json:"network"`
+	}
+	this.Type = StreamConnectionTypeRawTCP
+	jsonConfig := new(JSONConfig)
+	if err := json.Unmarshal(data, jsonConfig); err != nil {
+		return err
+	}
+	if jsonConfig.Network.HasNetwork(v2net.KCPNetwork) {
+		this.Type |= StreamConnectionTypeKCP
+	}
+	if jsonConfig.Network.HasNetwork(v2net.TCPNetwork) {
+		this.Type |= StreamConnectionTypeTCP
+	}
+	return nil
+}

+ 63 - 0
transport/internet/dialer.go

@@ -0,0 +1,63 @@
+package internet
+
+import (
+	"errors"
+	"net"
+	"time"
+
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+var (
+	ErrUnsupportedStreamType = errors.New("Unsupported stream type.")
+)
+
+type Dialer func(src v2net.Address, dest v2net.Destination) (Connection, error)
+
+var (
+	TCPDialer    Dialer
+	KCPDialer    Dialer
+	RawTCPDialer Dialer
+	UDPDialer    Dialer
+)
+
+func Dial(src v2net.Address, dest v2net.Destination, settings *StreamSettings) (Connection, error) {
+	if dest.IsTCP() {
+		switch {
+		case settings.IsCapableOf(StreamConnectionTypeKCP):
+			return KCPDialer(src, dest)
+		case settings.IsCapableOf(StreamConnectionTypeTCP):
+			return TCPDialer(src, dest)
+		case settings.IsCapableOf(StreamConnectionTypeRawTCP):
+			return RawTCPDialer(src, dest)
+		}
+		return nil, ErrUnsupportedStreamType
+	}
+
+	return UDPDialer(src, dest)
+}
+
+func DialToDest(src v2net.Address, dest v2net.Destination) (net.Conn, error) {
+	dialer := &net.Dialer{
+		Timeout:   time.Second * 60,
+		DualStack: true,
+	}
+
+	if src != nil && src != v2net.AnyIP {
+		var addr net.Addr
+		if dest.IsTCP() {
+			addr = &net.TCPAddr{
+				IP:   src.IP(),
+				Port: 0,
+			}
+		} else {
+			addr = &net.UDPAddr{
+				IP:   src.IP(),
+				Port: 0,
+			}
+		}
+		dialer.LocalAddr = addr
+	}
+
+	return dialer.Dial(dest.Network().String(), dest.NetAddr())
+}

+ 4 - 17
transport/hub/dialer_test.go → transport/internet/dialer_test.go

@@ -1,14 +1,13 @@
-package hub_test
+package internet_test
 
 import (
-	"net"
 	"testing"
 
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
 	"github.com/v2ray/v2ray-core/testing/assert"
 	"github.com/v2ray/v2ray-core/testing/servers/tcp"
-	. "github.com/v2ray/v2ray-core/transport/hub"
+	. "github.com/v2ray/v2ray-core/transport/internet"
 )
 
 func TestDialDomain(t *testing.T) {
@@ -21,7 +20,7 @@ func TestDialDomain(t *testing.T) {
 	assert.Error(err).IsNil()
 	defer server.Close()
 
-	conn, err := Dial(nil, v2net.TCPDestination(v2net.DomainAddress("local.v2ray.com"), dest.Port()))
+	conn, err := DialToDest(nil, v2net.TCPDestination(v2net.DomainAddress("local.v2ray.com"), dest.Port()))
 	assert.Error(err).IsNil()
 	assert.String(conn.RemoteAddr().String()).Equals("127.0.0.1:" + dest.Port().String())
 	conn.Close()
@@ -37,19 +36,7 @@ func TestDialWithLocalAddr(t *testing.T) {
 	assert.Error(err).IsNil()
 	defer server.Close()
 
-	var localAddr net.IP
-	addrs, err := net.InterfaceAddrs()
-	assert.Error(err).IsNil()
-	for _, addr := range addrs {
-		str := addr.String()
-		ip := net.ParseIP(str)
-		if ip != nil && ip.To4() != nil {
-			localAddr = ip.To4()
-		}
-	}
-	assert.Pointer(localAddr).IsNotNil()
-
-	conn, err := Dial(v2net.IPAddress(localAddr), v2net.TCPDestination(v2net.LocalHostIP, dest.Port()))
+	conn, err := DialToDest(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, dest.Port()))
 	assert.Error(err).IsNil()
 	assert.String(conn.RemoteAddr().String()).Equals("127.0.0.1:" + dest.Port().String())
 	conn.Close()

+ 26 - 23
transport/hub/kcpv/config.go → transport/internet/kcp/config.go

@@ -1,4 +1,4 @@
-package kcpv
+package kcp
 
 /*AdvancedConfig define behavior of KCP in detail
 
@@ -29,16 +29,6 @@ can cause v2ray to kill the proxy connection it is relaying,
 Higher value can prevent server from closing zombie socket and
 waste resources.
 */
-type AdvancedConfig struct {
-	Mtu          int  `json:"MaximumTransmissionUnit"`
-	Sndwnd       int  `json:"SendingWindowSize"`
-	Rcvwnd       int  `json:"ReceivingWindowSize"`
-	Fec          int  `json:"ForwardErrorCorrectionGroupSize"`
-	Acknodelay   bool `json:"AcknowledgeNoDelay"`
-	Dscp         int  `json:"Dscp"`
-	ReadTimeout  int  `json:"ReadTimeout"`
-	WriteTimeout int  `json:"WriteTimeout"`
-}
 
 /*Config define basic behavior of KCP
 Mode:
@@ -46,20 +36,33 @@ can be one of these values:
 fast3,fast2,fast,normal
 <<<<<<- less delay
 ->>>>>> less bandwich wasted
-
-EncryptionKey:
-a string that will be the EncryptionKey of
-All KCP connection we Listen-Accpet or
-Dial, We are not very sure about how this
-encryption hehave and DO use a unique randomly
-generated key.
 */
 type Config struct {
-	Mode            string          `json:"Mode"`
-	Key             string          `json:"EncryptionKey"`
-	AdvancedConfigs *AdvancedConfig `json:"AdvancedConfig,omitempty"`
+	Mode         string `json:"Mode"`
+	Mtu          int    `json:"MaximumTransmissionUnit"`
+	Sndwnd       int    `json:"SendingWindowSize"`
+	Rcvwnd       int    `json:"ReceivingWindowSize"`
+	Fec          int    `json:"ForwardErrorCorrectionGroupSize"`
+	Acknodelay   bool   `json:"AcknowledgeNoDelay"`
+	Dscp         int    `json:"Dscp"`
+	ReadTimeout  int    `json:"ReadTimeout"`
+	WriteTimeout int    `json:"WriteTimeout"`
 }
 
-var DefaultAdvancedConfigs = &AdvancedConfig{
-	Mtu: 1350, Sndwnd: 1024, Rcvwnd: 1024, Fec: 4, Dscp: 0, ReadTimeout: 600, WriteTimeout: 500, Acknodelay: false,
+func (this *Config) Apply() {
+	effectiveConfig = *this
 }
+
+var (
+	effectiveConfig = Config{
+		Mode:         "normal",
+		Mtu:          1350,
+		Sndwnd:       1024,
+		Rcvwnd:       1024,
+		Fec:          4,
+		Dscp:         0,
+		ReadTimeout:  600,
+		WriteTimeout: 500,
+		Acknodelay:   false,
+	}
+)

+ 27 - 0
transport/internet/kcp/config_json.go

@@ -0,0 +1,27 @@
+// +build json
+
+package kcp
+
+import (
+	"encoding/json"
+)
+
+func (this *Config) UnmarshalJSON(data []byte) error {
+	type JSONConfig struct {
+		Mode         string `json:"Mode"`
+		Mtu          int    `json:"MaximumTransmissionUnit"`
+		Sndwnd       int    `json:"SendingWindowSize"`
+		Rcvwnd       int    `json:"ReceivingWindowSize"`
+		Fec          int    `json:"ForwardErrorCorrectionGroupSize"`
+		Acknodelay   bool   `json:"AcknowledgeNoDelay"`
+		Dscp         int    `json:"Dscp"`
+		ReadTimeout  int    `json:"ReadTimeout"`
+		WriteTimeout int    `json:"WriteTimeout"`
+	}
+	jsonConfig := effectiveConfig
+	if err := json.Unmarshal(data, &jsonConfig); err != nil {
+		return err
+	}
+	*this = jsonConfig
+	return nil
+}

+ 26 - 0
transport/internet/kcp/dialer.go

@@ -0,0 +1,26 @@
+package kcp
+
+import (
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/internet"
+
+	"github.com/xtaci/kcp-go"
+)
+
+func DialKCP(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
+	cpip, _ := kcp.NewNoneBlockCrypt(nil)
+	kcv, err := kcp.DialWithOptions(effectiveConfig.Fec, dest.NetAddr(), cpip)
+	if err != nil {
+		return nil, err
+	}
+	kcvn := &KCPVconn{hc: kcv}
+	err = kcvn.ApplyConf()
+	if err != nil {
+		return nil, err
+	}
+	return kcvn, nil
+}
+
+func init() {
+	internet.KCPDialer = DialKCP
+}

+ 33 - 47
transport/hub/kcp.go → transport/internet/kcp/session.go

@@ -1,20 +1,18 @@
-package hub
+package kcp
 
 import (
 	"errors"
 	"net"
 	"time"
 
-	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
-	"github.com/v2ray/v2ray-core/transport"
-	"github.com/v2ray/v2ray-core/transport/hub/kcpv"
+	"github.com/v2ray/v2ray-core/transport/internet"
+
 	"github.com/xtaci/kcp-go"
 )
 
 type KCPVlistener struct {
 	lst                    *kcp.Listener
-	conf                   *kcpv.Config
 	previousSocketid       map[int]uint32
 	previousSocketid_mapid int
 }
@@ -25,7 +23,7 @@ It could be reconized as a new connection and call accept.
 If we can detect that the connection is of such a kind,
 we will discard that conn.
 */
-func (kvl *KCPVlistener) Accept() (net.Conn, error) {
+func (kvl *KCPVlistener) Accept() (internet.Connection, error) {
 	conn, err := kvl.lst.Accept()
 	if err != nil {
 		return nil, err
@@ -59,7 +57,6 @@ func (kvl *KCPVlistener) Accept() (net.Conn, error) {
 	}
 
 	kcv := &KCPVconn{hc: conn}
-	kcv.conf = kvl.conf
 	err = kcv.ApplyConf()
 	if err != nil {
 		return nil, err
@@ -77,14 +74,13 @@ func (kvl *KCPVlistener) Addr() net.Addr {
 
 type KCPVconn struct {
 	hc         *kcp.UDPSession
-	conf       *kcpv.Config
 	conntokeep time.Time
 }
 
 //var counter int
 
 func (kcpvc *KCPVconn) Read(b []byte) (int, error) {
-	ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.ReadTimeout) * time.Second)
+	ifb := time.Now().Add(time.Duration(effectiveConfig.ReadTimeout) * time.Second)
 	if ifb.After(kcpvc.conntokeep) {
 		kcpvc.conntokeep = ifb
 	}
@@ -93,7 +89,7 @@ func (kcpvc *KCPVconn) Read(b []byte) (int, error) {
 }
 
 func (kcpvc *KCPVconn) Write(b []byte) (int, error) {
-	ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.WriteTimeout) * time.Second)
+	ifb := time.Now().Add(time.Duration(effectiveConfig.WriteTimeout) * time.Second)
 	if ifb.After(kcpvc.conntokeep) {
 		kcpvc.conntokeep = ifb
 	}
@@ -107,27 +103,22 @@ It is recommmanded to call this func once and only once
 */
 func (kcpvc *KCPVconn) ApplyConf() error {
 	nodelay, interval, resend, nc := 0, 40, 0, 0
-	if kcpvc.conf.Mode != "manual" {
-		switch kcpvc.conf.Mode {
-		case "normal":
-			nodelay, interval, resend, nc = 0, 30, 2, 1
-		case "fast":
-			nodelay, interval, resend, nc = 0, 20, 2, 1
-		case "fast2":
-			nodelay, interval, resend, nc = 1, 20, 2, 1
-		case "fast3":
-			nodelay, interval, resend, nc = 1, 10, 2, 1
-		}
-	} else {
-		log.Error("kcp: Failed to Apply configure: Manual mode is not supported.(yet!)")
-		return errors.New("kcp: Manual Not Implemented")
+	switch effectiveConfig.Mode {
+	case "normal":
+		nodelay, interval, resend, nc = 0, 30, 2, 1
+	case "fast":
+		nodelay, interval, resend, nc = 0, 20, 2, 1
+	case "fast2":
+		nodelay, interval, resend, nc = 1, 20, 2, 1
+	case "fast3":
+		nodelay, interval, resend, nc = 1, 10, 2, 1
 	}
 
 	kcpvc.hc.SetNoDelay(nodelay, interval, resend, nc)
-	kcpvc.hc.SetWindowSize(kcpvc.conf.AdvancedConfigs.Sndwnd, kcpvc.conf.AdvancedConfigs.Rcvwnd)
-	kcpvc.hc.SetMtu(kcpvc.conf.AdvancedConfigs.Mtu)
-	kcpvc.hc.SetACKNoDelay(kcpvc.conf.AdvancedConfigs.Acknodelay)
-	kcpvc.hc.SetDSCP(kcpvc.conf.AdvancedConfigs.Dscp)
+	kcpvc.hc.SetWindowSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd)
+	kcpvc.hc.SetMtu(effectiveConfig.Mtu)
+	kcpvc.hc.SetACKNoDelay(effectiveConfig.Acknodelay)
+	kcpvc.hc.SetDSCP(effectiveConfig.Dscp)
 	//counter++
 	//log.Info(counter)
 	return nil
@@ -167,27 +158,22 @@ func (kcpvc *KCPVconn) SetWriteDeadline(t time.Time) error {
 	return kcpvc.hc.SetWriteDeadline(t)
 }
 
-func DialKCP(dest v2net.Destination) (*KCPVconn, error) {
-	kcpconf := transport.KcpConfig
-	cpip, _ := kcpv.GetChipher(kcpconf.Key)
-	kcv, err := kcp.DialWithOptions(kcpconf.AdvancedConfigs.Fec, dest.NetAddr(), cpip)
-	if err != nil {
-		return nil, err
-	}
-	kcvn := &KCPVconn{hc: kcv}
-	kcvn.conf = kcpconf
-	err = kcvn.ApplyConf()
-	if err != nil {
-		return nil, err
-	}
-	return kcvn, nil
+func (this *KCPVconn) Reusable() bool {
+	return false
 }
 
-func ListenKCP(address v2net.Address, port v2net.Port) (*KCPVlistener, error) {
-	kcpconf := transport.KcpConfig
-	cpip, _ := kcpv.GetChipher(kcpconf.Key)
+func (this *KCPVconn) SetReusable(b bool) {
+
+}
+
+func ListenKCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
 	laddr := address.String() + ":" + port.String()
-	kcl, err := kcp.ListenWithOptions(kcpconf.AdvancedConfigs.Fec, laddr, cpip)
-	kcvl := &KCPVlistener{lst: kcl, conf: kcpconf}
+	crypt, _ := kcp.NewNoneBlockCrypt(nil)
+	kcl, err := kcp.ListenWithOptions(effectiveConfig.Fec, laddr, crypt)
+	kcvl := &KCPVlistener{lst: kcl}
 	return kcvl, err
 }
+
+func init() {
+	internet.KCPListenFunc = ListenKCP
+}

+ 15 - 0
transport/internet/tcp/config.go

@@ -0,0 +1,15 @@
+package tcp
+
+type Config struct {
+	ConnectionReuse bool
+}
+
+func (this *Config) Apply() {
+	effectiveConfig = this
+}
+
+var (
+	effectiveConfig = &Config{
+		ConnectionReuse: true,
+	}
+)

+ 20 - 0
transport/internet/tcp/config_json.go

@@ -0,0 +1,20 @@
+package tcp
+
+import (
+	"encoding/json"
+)
+
+func (this *Config) UnmarshalJSON(data []byte) error {
+	type JsonConfig struct {
+		ConnectionReuse bool `json:"connectionReuse"`
+	}
+	jsonConfig := &JsonConfig{
+		ConnectionReuse: true,
+	}
+	if err := json.Unmarshal(data, jsonConfig); err != nil {
+		return err
+	}
+	this.ConnectionReuse = jsonConfig.ConnectionReuse
+
+	return nil
+}

+ 31 - 10
transport/hub/connection.go → transport/internet/tcp/connection.go

@@ -1,24 +1,31 @@
-package hub
+package tcp
 
 import (
 	"errors"
+	"io"
 	"net"
 	"reflect"
 	"time"
-
-	"github.com/v2ray/v2ray-core/transport"
 )
 
 var (
 	ErrInvalidConn = errors.New("Invalid Connection.")
 )
 
-type ConnectionHandler func(*Connection)
-
 type ConnectionManager interface {
 	Recycle(string, net.Conn)
 }
 
+type RawConnection struct {
+	net.TCPConn
+}
+
+func (this *RawConnection) Reusable() bool {
+	return false
+}
+
+func (this *RawConnection) SetReusable(b bool) {}
+
 type Connection struct {
 	dest     string
 	conn     net.Conn
@@ -26,9 +33,18 @@ type Connection struct {
 	reusable bool
 }
 
+func NewConnection(dest string, conn net.Conn, manager ConnectionManager) *Connection {
+	return &Connection{
+		dest:     dest,
+		conn:     conn,
+		listener: manager,
+		reusable: effectiveConfig.ConnectionReuse,
+	}
+}
+
 func (this *Connection) Read(b []byte) (int, error) {
 	if this == nil || this.conn == nil {
-		return 0, ErrorClosedConnection
+		return 0, io.EOF
 	}
 
 	return this.conn.Read(b)
@@ -36,20 +52,22 @@ func (this *Connection) Read(b []byte) (int, error) {
 
 func (this *Connection) Write(b []byte) (int, error) {
 	if this == nil || this.conn == nil {
-		return 0, ErrorClosedConnection
+		return 0, io.ErrClosedPipe
 	}
 	return this.conn.Write(b)
 }
 
 func (this *Connection) Close() error {
 	if this == nil || this.conn == nil {
-		return ErrorClosedConnection
+		return io.ErrClosedPipe
 	}
-	if transport.IsConnectionReusable() && this.Reusable() {
+	if this.Reusable() {
 		this.listener.Recycle(this.dest, this.conn)
 		return nil
 	}
-	return this.conn.Close()
+	err := this.conn.Close()
+	this.conn = nil
+	return err
 }
 
 func (this *Connection) LocalAddr() net.Addr {
@@ -73,6 +91,9 @@ func (this *Connection) SetWriteDeadline(t time.Time) error {
 }
 
 func (this *Connection) SetReusable(reusable bool) {
+	if !effectiveConfig.ConnectionReuse {
+		return
+	}
 	this.reusable = reusable
 }
 

+ 1 - 1
transport/hub/connection_cache.go → transport/internet/tcp/connection_cache.go

@@ -1,4 +1,4 @@
-package hub
+package tcp
 
 import (
 	"net"

+ 49 - 0
transport/internet/tcp/dialer.go

@@ -0,0 +1,49 @@
+package tcp
+
+import (
+	"net"
+
+	"github.com/v2ray/v2ray-core/common/log"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/internet"
+)
+
+var (
+	globalCache = NewConnectionCache()
+)
+
+func Dial(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
+	log.Info("Dailing TCP to ", dest)
+	if src == nil {
+		src = v2net.AnyIP
+	}
+	id := src.String() + "-" + dest.NetAddr()
+	var conn net.Conn
+	if dest.IsTCP() && effectiveConfig.ConnectionReuse {
+		conn = globalCache.Get(id)
+	}
+	if conn == nil {
+		var err error
+		conn, err = internet.DialToDest(src, dest)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return NewConnection(id, conn, globalCache), nil
+}
+
+func DialRaw(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
+	log.Info("Dailing Raw TCP to ", dest)
+	conn, err := internet.DialToDest(src, dest)
+	if err != nil {
+		return nil, err
+	}
+	return &RawConnection{
+		TCPConn: *conn.(*net.TCPConn),
+	}, nil
+}
+
+func init() {
+	internet.TCPDialer = Dial
+	internet.RawTCPDialer = DialRaw
+}

+ 159 - 0
transport/internet/tcp/hub.go

@@ -0,0 +1,159 @@
+package tcp
+
+import (
+	"errors"
+	"net"
+	"sync"
+	"time"
+
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/internet"
+)
+
+var (
+	ErrClosedListener = errors.New("Listener is closed.")
+)
+
+type ConnectionWithError struct {
+	conn net.Conn
+	err  error
+}
+
+type TCPListener struct {
+	sync.Mutex
+	acccepting    bool
+	listener      *net.TCPListener
+	awaitingConns chan *ConnectionWithError
+}
+
+func ListenTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
+	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
+		IP:   address.IP(),
+		Port: int(port),
+	})
+	if err != nil {
+		return nil, err
+	}
+	l := &TCPListener{
+		acccepting:    true,
+		listener:      listener,
+		awaitingConns: make(chan *ConnectionWithError, 32),
+	}
+	go l.KeepAccepting()
+	return l, nil
+}
+
+func (this *TCPListener) Accept() (internet.Connection, error) {
+	for this.acccepting {
+		select {
+		case connErr, open := <-this.awaitingConns:
+			if !open {
+				return nil, ErrClosedListener
+			}
+			if connErr.err != nil {
+				return nil, connErr.err
+			}
+			return NewConnection("", connErr.conn, this), nil
+		case <-time.After(time.Second * 2):
+		}
+	}
+	return nil, ErrClosedListener
+}
+
+func (this *TCPListener) KeepAccepting() {
+	for this.acccepting {
+		conn, err := this.listener.Accept()
+		this.Lock()
+		if !this.acccepting {
+			this.Unlock()
+			break
+		}
+		select {
+		case this.awaitingConns <- &ConnectionWithError{
+			conn: conn,
+			err:  err,
+		}:
+		default:
+			if conn != nil {
+				conn.Close()
+			}
+		}
+
+		this.Unlock()
+	}
+}
+
+func (this *TCPListener) Recycle(dest string, conn net.Conn) {
+	this.Lock()
+	defer this.Unlock()
+	if !this.acccepting {
+		return
+	}
+	select {
+	case this.awaitingConns <- &ConnectionWithError{conn: conn}:
+	default:
+		conn.Close()
+	}
+}
+
+func (this *TCPListener) Addr() net.Addr {
+	return this.listener.Addr()
+}
+
+func (this *TCPListener) Close() error {
+	this.Lock()
+	defer this.Unlock()
+	this.acccepting = false
+	this.listener.Close()
+	close(this.awaitingConns)
+	for connErr := range this.awaitingConns {
+		if connErr.conn != nil {
+			go connErr.conn.Close()
+		}
+	}
+	return nil
+}
+
+type RawTCPListener struct {
+	accepting bool
+	listener  *net.TCPListener
+}
+
+func (this *RawTCPListener) Accept() (internet.Connection, error) {
+	conn, err := this.listener.AcceptTCP()
+	if err != nil {
+		return nil, err
+	}
+	return &RawConnection{
+		TCPConn: *conn,
+	}, nil
+}
+
+func (this *RawTCPListener) Addr() net.Addr {
+	return this.listener.Addr()
+}
+
+func (this *RawTCPListener) Close() error {
+	this.accepting = false
+	this.listener.Close()
+	return nil
+}
+
+func ListenRawTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
+	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
+		IP:   address.IP(),
+		Port: int(port),
+	})
+	if err != nil {
+		return nil, err
+	}
+	return &RawTCPListener{
+		accepting: true,
+		listener:  listener,
+	}, nil
+}
+
+func init() {
+	internet.TCPListenFunc = ListenTCP
+	internet.RawTCPListenFunc = ListenRawTCP
+}

+ 77 - 0
transport/internet/tcp_hub.go

@@ -0,0 +1,77 @@
+package internet
+
+import (
+	"errors"
+	"net"
+	"sync"
+
+	"github.com/v2ray/v2ray-core/common/log"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+var (
+	ErrorClosedConnection = errors.New("Connection already closed.")
+
+	KCPListenFunc    ListenFunc
+	TCPListenFunc    ListenFunc
+	RawTCPListenFunc ListenFunc
+)
+
+type ListenFunc func(address v2net.Address, port v2net.Port) (Listener, error)
+type Listener interface {
+	Accept() (Connection, error)
+	Close() error
+	Addr() net.Addr
+}
+
+type TCPHub struct {
+	sync.Mutex
+	listener     Listener
+	connCallback ConnectionHandler
+	accepting    bool
+}
+
+func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamSettings) (*TCPHub, error) {
+	var listener Listener
+	var err error
+	if settings.IsCapableOf(StreamConnectionTypeKCP) {
+		listener, err = KCPListenFunc(address, port)
+	} else if settings.IsCapableOf(StreamConnectionTypeTCP) {
+		listener, err = TCPListenFunc(address, port)
+	} else {
+		listener, err = RawTCPListenFunc(address, port)
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	hub := &TCPHub{
+		listener:     listener,
+		connCallback: callback,
+	}
+
+	go hub.start()
+	return hub, nil
+}
+
+func (this *TCPHub) Close() {
+	this.accepting = false
+	this.listener.Close()
+}
+
+func (this *TCPHub) start() {
+	this.accepting = true
+	for this.accepting {
+		conn, err := this.listener.Accept()
+
+		if err != nil {
+			if this.accepting {
+				log.Warning("Listener: Failed to accept new TCP connection: ", err)
+			}
+			continue
+		}
+		log.Info("Handling connection from ", conn.RemoteAddr())
+		go this.connCallback(conn)
+	}
+}

+ 30 - 0
transport/internet/udp/connection.go

@@ -0,0 +1,30 @@
+package udp
+
+import (
+	"net"
+
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/internet"
+)
+
+type Connection struct {
+	net.UDPConn
+}
+
+func (this *Connection) Reusable() bool {
+	return false
+}
+
+func (this *Connection) SetReusable(b bool) {}
+
+func init() {
+	internet.UDPDialer = func(src v2net.Address, dest v2net.Destination) (internet.Connection, error) {
+		conn, err := internet.DialToDest(src, dest)
+		if err != nil {
+			return nil, err
+		}
+		return &Connection{
+			UDPConn: *(conn.(*net.UDPConn)),
+		}, nil
+	}
+}

+ 1 - 1
transport/hub/udp.go → transport/internet/udp/udp.go

@@ -1,4 +1,4 @@
-package hub
+package udp
 
 import (
 	"net"

+ 1 - 1
transport/hub/udp_server.go → transport/internet/udp/udp_server.go

@@ -1,4 +1,4 @@
-package hub
+package udp
 
 import (
 	"sync"

+ 0 - 18
transport/transport.go

@@ -1,18 +0,0 @@
-package transport
-
-import "github.com/v2ray/v2ray-core/transport/hub/kcpv"
-
-var (
-	connectionReuse = true
-	enableKcp       = false
-	KcpConfig       *kcpv.Config
-)
-
-// IsConnectionReusable returns true if V2Ray is trying to reuse TCP connections.
-func IsConnectionReusable() bool {
-	return connectionReuse
-}
-
-func IsKcpEnabled() bool {
-	return enableKcp
-}