hub.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. //go:build !confonly
  2. // +build !confonly
  3. package grpc
  4. import (
  5. "context"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/credentials"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/session"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet/grpc/encoding"
  13. "github.com/v2fly/v2ray-core/v5/transport/internet/tls"
  14. )
  15. type Listener struct {
  16. encoding.UnimplementedGunServiceServer
  17. ctx context.Context
  18. handler internet.ConnHandler
  19. local net.Addr
  20. config *Config
  21. locker *internet.FileLocker // for unix domain socket
  22. s *grpc.Server
  23. }
  24. func (l Listener) Tun(server encoding.GunService_TunServer) error {
  25. tunCtx, cancel := context.WithCancel(l.ctx)
  26. l.handler(encoding.NewGunConn(server, cancel))
  27. <-tunCtx.Done()
  28. return nil
  29. }
  30. func (l Listener) Close() error {
  31. l.s.Stop()
  32. return nil
  33. }
  34. func (l Listener) Addr() net.Addr {
  35. return l.local
  36. }
  37. func Listen(ctx context.Context, address net.Address, port net.Port, settings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  38. grpcSettings := settings.ProtocolSettings.(*Config)
  39. var listener *Listener
  40. if port == net.Port(0) { // unix
  41. listener = &Listener{
  42. handler: handler,
  43. local: &net.UnixAddr{
  44. Name: address.Domain(),
  45. Net: "unix",
  46. },
  47. config: grpcSettings,
  48. }
  49. } else { // tcp
  50. listener = &Listener{
  51. handler: handler,
  52. local: &net.TCPAddr{
  53. IP: address.IP(),
  54. Port: int(port),
  55. },
  56. config: grpcSettings,
  57. }
  58. }
  59. listener.ctx = ctx
  60. config := tls.ConfigFromStreamSettings(settings)
  61. var s *grpc.Server
  62. if config == nil {
  63. s = grpc.NewServer()
  64. } else {
  65. // gRPC server may silently ignore TLS errors
  66. s = grpc.NewServer(grpc.Creds(credentials.NewTLS(config.GetTLSConfig(tls.WithNextProto("h2")))))
  67. }
  68. listener.s = s
  69. if settings.SocketSettings != nil && settings.SocketSettings.AcceptProxyProtocol {
  70. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  71. }
  72. go func() {
  73. var streamListener net.Listener
  74. var err error
  75. if port == net.Port(0) { // unix
  76. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  77. Name: address.Domain(),
  78. Net: "unix",
  79. }, settings.SocketSettings)
  80. if err != nil {
  81. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  82. return
  83. }
  84. locker := ctx.Value(address.Domain())
  85. if locker != nil {
  86. listener.locker = locker.(*internet.FileLocker)
  87. }
  88. } else { // tcp
  89. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  90. IP: address.IP(),
  91. Port: int(port),
  92. }, settings.SocketSettings)
  93. if err != nil {
  94. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  95. return
  96. }
  97. }
  98. encoding.RegisterGunServiceServerX(s, listener, grpcSettings.ServiceName)
  99. if err = s.Serve(streamListener); err != nil {
  100. newError("Listener for grpc ended").Base(err).WriteToLog()
  101. }
  102. }()
  103. return listener, nil
  104. }
  105. func init() {
  106. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  107. }