dial.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. //go:build !confonly
  2. // +build !confonly
  3. package grpc
  4. import (
  5. "context"
  6. gonet "net"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/backoff"
  11. "google.golang.org/grpc/connectivity"
  12. "google.golang.org/grpc/credentials"
  13. core "github.com/v2fly/v2ray-core/v5"
  14. "github.com/v2fly/v2ray-core/v5/common"
  15. "github.com/v2fly/v2ray-core/v5/common/net"
  16. "github.com/v2fly/v2ray-core/v5/common/session"
  17. "github.com/v2fly/v2ray-core/v5/transport/internet"
  18. "github.com/v2fly/v2ray-core/v5/transport/internet/grpc/encoding"
  19. "github.com/v2fly/v2ray-core/v5/transport/internet/tls"
  20. )
  21. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
  22. newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
  23. conn, err := dialgRPC(ctx, dest, streamSettings)
  24. if err != nil {
  25. return nil, newError("failed to dial Grpc").Base(err)
  26. }
  27. return internet.Connection(conn), nil
  28. }
  29. func init() {
  30. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  31. }
  32. type dialerCanceller func()
  33. var (
  34. globalDialerMap map[net.Destination]*grpc.ClientConn
  35. globalDialerAccess sync.Mutex
  36. )
  37. func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
  38. grpcSettings := streamSettings.ProtocolSettings.(*Config)
  39. config := tls.ConfigFromStreamSettings(streamSettings)
  40. dialOption := grpc.WithInsecure()
  41. if config != nil {
  42. dialOption = grpc.WithTransportCredentials(credentials.NewTLS(config.GetTLSConfig()))
  43. }
  44. conn, canceller, err := getGrpcClient(ctx, dest, dialOption, streamSettings)
  45. if err != nil {
  46. return nil, newError("Cannot dial grpc").Base(err)
  47. }
  48. client := encoding.NewGunServiceClient(conn)
  49. gunService, err := client.(encoding.GunServiceClientX).TunCustomName(ctx, grpcSettings.ServiceName)
  50. if err != nil {
  51. canceller()
  52. return nil, newError("Cannot dial grpc").Base(err)
  53. }
  54. return encoding.NewGunConn(gunService, nil), nil
  55. }
  56. func getGrpcClient(ctx context.Context, dest net.Destination, dialOption grpc.DialOption, streamSettings *internet.MemoryStreamConfig) (*grpc.ClientConn, dialerCanceller, error) {
  57. globalDialerAccess.Lock()
  58. defer globalDialerAccess.Unlock()
  59. if globalDialerMap == nil {
  60. globalDialerMap = make(map[net.Destination]*grpc.ClientConn)
  61. }
  62. canceller := func() {
  63. globalDialerAccess.Lock()
  64. defer globalDialerAccess.Unlock()
  65. delete(globalDialerMap, dest)
  66. }
  67. // TODO Should support chain proxy to the same destination
  68. if client, found := globalDialerMap[dest]; found && client.GetState() != connectivity.Shutdown {
  69. return client, canceller, nil
  70. }
  71. conn, err := grpc.Dial(
  72. dest.Address.String()+":"+dest.Port.String(),
  73. dialOption,
  74. grpc.WithConnectParams(grpc.ConnectParams{
  75. Backoff: backoff.Config{
  76. BaseDelay: 500 * time.Millisecond,
  77. Multiplier: 1.5,
  78. Jitter: 0.2,
  79. MaxDelay: 19 * time.Second,
  80. },
  81. MinConnectTimeout: 5 * time.Second,
  82. }),
  83. grpc.WithContextDialer(func(ctxGrpc context.Context, s string) (gonet.Conn, error) {
  84. rawHost, rawPort, err := net.SplitHostPort(s)
  85. if err != nil {
  86. return nil, err
  87. }
  88. if len(rawPort) == 0 {
  89. rawPort = "443"
  90. }
  91. port, err := net.PortFromString(rawPort)
  92. if err != nil {
  93. return nil, err
  94. }
  95. address := net.ParseAddress(rawHost)
  96. detachedContext := core.ToBackgroundDetachedContext(ctx)
  97. return internet.DialSystem(detachedContext, net.TCPDestination(address, port), streamSettings.SocketSettings)
  98. }),
  99. )
  100. globalDialerMap[dest] = conn
  101. return conn, canceller, err
  102. }