dial.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // +build !confonly
  2. package grpc
  3. import (
  4. "context"
  5. gonet "net"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/backoff"
  10. "google.golang.org/grpc/connectivity"
  11. "google.golang.org/grpc/credentials"
  12. core "github.com/v2fly/v2ray-core/v4"
  13. "github.com/v2fly/v2ray-core/v4/common"
  14. "github.com/v2fly/v2ray-core/v4/common/net"
  15. "github.com/v2fly/v2ray-core/v4/common/session"
  16. "github.com/v2fly/v2ray-core/v4/transport/internet"
  17. "github.com/v2fly/v2ray-core/v4/transport/internet/grpc/encoding"
  18. "github.com/v2fly/v2ray-core/v4/transport/internet/tls"
  19. )
  20. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
  21. newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
  22. conn, err := dialgRPC(ctx, dest, streamSettings)
  23. if err != nil {
  24. return nil, newError("failed to dial Grpc").Base(err)
  25. }
  26. return internet.Connection(conn), nil
  27. }
  28. func init() {
  29. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  30. }
  31. var (
  32. globalDialerMap map[net.Destination]*grpc.ClientConn
  33. globalDialerAccess sync.Mutex
  34. )
  35. func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
  36. grpcSettings := streamSettings.ProtocolSettings.(*Config)
  37. config := tls.ConfigFromStreamSettings(streamSettings)
  38. dialOption := grpc.WithInsecure()
  39. if config != nil {
  40. dialOption = grpc.WithTransportCredentials(credentials.NewTLS(config.GetTLSConfig()))
  41. }
  42. conn, err := getGrpcClient(ctx, dest, dialOption)
  43. if err != nil {
  44. return nil, newError("Cannot dial grpc").Base(err)
  45. }
  46. client := encoding.NewGunServiceClient(conn)
  47. gunService, err := client.(encoding.GunServiceClientX).TunCustomName(ctx, grpcSettings.ServiceName)
  48. if err != nil {
  49. return nil, newError("Cannot dial grpc").Base(err)
  50. }
  51. return encoding.NewGunConn(gunService, nil), nil
  52. }
  53. func getGrpcClient(ctx context.Context, dest net.Destination, dialOption grpc.DialOption) (*grpc.ClientConn, error) {
  54. globalDialerAccess.Lock()
  55. defer globalDialerAccess.Unlock()
  56. if globalDialerMap == nil {
  57. globalDialerMap = make(map[net.Destination]*grpc.ClientConn)
  58. }
  59. // TODO Should support chain proxy to the same destination
  60. if client, found := globalDialerMap[dest]; found && client.GetState() != connectivity.Shutdown {
  61. return client, nil
  62. }
  63. conn, err := grpc.Dial(
  64. dest.Address.String()+":"+dest.Port.String(),
  65. dialOption,
  66. grpc.WithConnectParams(grpc.ConnectParams{
  67. Backoff: backoff.Config{
  68. BaseDelay: 500 * time.Millisecond,
  69. Multiplier: 1.5,
  70. Jitter: 0.2,
  71. MaxDelay: 19 * time.Second,
  72. },
  73. MinConnectTimeout: 5 * time.Second,
  74. }),
  75. grpc.WithContextDialer(func(ctxGrpc context.Context, s string) (gonet.Conn, error) {
  76. rawHost, rawPort, err := net.SplitHostPort(s)
  77. if err != nil {
  78. return nil, err
  79. }
  80. if len(rawPort) == 0 {
  81. rawPort = "443"
  82. }
  83. port, err := net.PortFromString(rawPort)
  84. if err != nil {
  85. return nil, err
  86. }
  87. address := net.ParseAddress(rawHost)
  88. detachedContext := core.ToBackgroundDetachedContext(ctx)
  89. return internet.DialSystem(detachedContext, net.TCPDestination(address, port), nil)
  90. }),
  91. )
  92. globalDialerMap[dest] = conn
  93. return conn, err
  94. }