commander.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package commander
  2. //go:generate go run $GOPATH/src/v2ray.com/core/common/errors/errorgen/main.go -pkg commander -path App,Commander
  3. import (
  4. "context"
  5. "net"
  6. "sync"
  7. "google.golang.org/grpc"
  8. "v2ray.com/core"
  9. "v2ray.com/core/common"
  10. )
  11. type Commander struct {
  12. sync.Mutex
  13. server *grpc.Server
  14. config Config
  15. ohm core.OutboundHandlerManager
  16. callbacks []core.ServiceRegistryCallback
  17. }
  18. func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
  19. v := core.FromContext(ctx)
  20. if v == nil {
  21. return nil, newError("V is not in context.")
  22. }
  23. c := &Commander{
  24. config: *config,
  25. ohm: v.OutboundHandlerManager(),
  26. }
  27. if err := v.RegisterFeature((*core.Commander)(nil), c); err != nil {
  28. return nil, err
  29. }
  30. return c, nil
  31. }
  32. func (c *Commander) RegisterService(callback core.ServiceRegistryCallback) {
  33. c.Lock()
  34. defer c.Unlock()
  35. if callback == nil {
  36. return
  37. }
  38. c.callbacks = append(c.callbacks, callback)
  39. }
  40. func (c *Commander) Start() error {
  41. c.Lock()
  42. c.server = grpc.NewServer()
  43. for _, callback := range c.callbacks {
  44. callback(c.server)
  45. }
  46. c.Unlock()
  47. listener := &OutboundListener{
  48. buffer: make(chan net.Conn, 4),
  49. }
  50. go func() {
  51. if err := c.server.Serve(listener); err != nil {
  52. newError("failed to start grpc server").Base(err).AtError().WriteToLog()
  53. }
  54. }()
  55. c.ohm.RemoveHandler(context.Background(), c.config.Tag)
  56. c.ohm.AddHandler(context.Background(), &CommanderOutbound{
  57. tag: c.config.Tag,
  58. listener: listener,
  59. })
  60. return nil
  61. }
  62. func (c *Commander) Close() error {
  63. c.Lock()
  64. defer c.Unlock()
  65. if c.server != nil {
  66. c.server.Stop()
  67. c.server = nil
  68. }
  69. return nil
  70. }
  71. func init() {
  72. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
  73. return NewCommander(ctx, cfg.(*Config))
  74. }))
  75. }