hub.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // +build !confonly
  2. package grpc
  3. import (
  4. "context"
  5. "google.golang.org/grpc"
  6. "google.golang.org/grpc/credentials"
  7. "github.com/v2fly/v2ray-core/v4/common"
  8. "github.com/v2fly/v2ray-core/v4/common/net"
  9. "github.com/v2fly/v2ray-core/v4/common/session"
  10. "github.com/v2fly/v2ray-core/v4/transport/internet"
  11. "github.com/v2fly/v2ray-core/v4/transport/internet/grpc/encoding"
  12. "github.com/v2fly/v2ray-core/v4/transport/internet/tls"
  13. )
  14. type Listener struct {
  15. encoding.UnimplementedGunServiceServer
  16. ctx context.Context
  17. handler internet.ConnHandler
  18. local net.Addr
  19. config *Config
  20. locker *internet.FileLocker // for unix domain socket
  21. s *grpc.Server
  22. }
  23. func (l Listener) Tun(server encoding.GunService_TunServer) error {
  24. tunCtx, cancel := context.WithCancel(l.ctx)
  25. l.handler(encoding.NewServerConn(server, cancel))
  26. <-tunCtx.Done()
  27. return nil
  28. }
  29. func (l Listener) Close() error {
  30. l.s.Stop()
  31. return nil
  32. }
  33. func (l Listener) Addr() net.Addr {
  34. return l.local
  35. }
  36. func Listen(ctx context.Context, address net.Address, port net.Port, settings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  37. grpcSettings := settings.ProtocolSettings.(*Config)
  38. var listener *Listener
  39. if port == net.Port(0) { // unix
  40. listener = &Listener{
  41. handler: handler,
  42. local: &net.UnixAddr{
  43. Name: address.Domain(),
  44. Net: "unix",
  45. },
  46. config: grpcSettings,
  47. }
  48. } else { // tcp
  49. listener = &Listener{
  50. handler: handler,
  51. local: &net.TCPAddr{
  52. IP: address.IP(),
  53. Port: int(port),
  54. },
  55. config: grpcSettings,
  56. }
  57. }
  58. listener.ctx = ctx
  59. config := tls.ConfigFromStreamSettings(settings)
  60. var s *grpc.Server
  61. if config == nil {
  62. s = grpc.NewServer()
  63. } else {
  64. s = grpc.NewServer(grpc.Creds(credentials.NewTLS(config.GetTLSConfig(tls.WithNextProto("h2")))))
  65. }
  66. listener.s = s
  67. if settings.SocketSettings != nil && settings.SocketSettings.AcceptProxyProtocol {
  68. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  69. }
  70. go func() {
  71. var streamListener net.Listener
  72. var err error
  73. if port == net.Port(0) { // unix
  74. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  75. Name: address.Domain(),
  76. Net: "unix",
  77. }, settings.SocketSettings)
  78. if err != nil {
  79. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  80. return
  81. }
  82. locker := ctx.Value(address.Domain())
  83. if locker != nil {
  84. listener.locker = locker.(*internet.FileLocker)
  85. }
  86. } else { // tcp
  87. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  88. IP: address.IP(),
  89. Port: int(port),
  90. }, settings.SocketSettings)
  91. if err != nil {
  92. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  93. return
  94. }
  95. }
  96. encoding.RegisterGunServiceServerX(s, listener, grpcSettings.ServiceName)
  97. if err = s.Serve(streamListener); err != nil {
  98. newError("Listener for grpc ended").Base(err).WriteToLog()
  99. }
  100. }()
  101. return listener, nil
  102. }
  103. func init() {
  104. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  105. }