command.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. // +build !confonly
  2. package command
  3. //go:generate errorgen
  4. import (
  5. "context"
  6. "google.golang.org/grpc"
  7. "v2ray.com/core"
  8. "v2ray.com/core/common"
  9. "v2ray.com/core/features/routing"
  10. "v2ray.com/core/features/stats"
  11. )
  12. // routingServer is an implementation of RoutingService.
  13. type routingServer struct {
  14. router routing.Router
  15. routingStats stats.Channel
  16. }
  17. // NewRoutingServer creates a statistics service with statistics manager.
  18. func NewRoutingServer(router routing.Router, routingStats stats.Channel) RoutingServiceServer {
  19. return &routingServer{
  20. router: router,
  21. routingStats: routingStats,
  22. }
  23. }
  24. func (s *routingServer) TestRoute(ctx context.Context, request *TestRouteRequest) (*RoutingContext, error) {
  25. if request.RoutingContext == nil {
  26. return nil, newError("Invalid routing request.")
  27. }
  28. route, err := s.router.PickRoute(AsRoutingContext(request.RoutingContext))
  29. if err != nil {
  30. return nil, err
  31. }
  32. if request.PublishResult && s.routingStats != nil {
  33. s.routingStats.Publish(route)
  34. }
  35. return AsProtobufMessage(request.FieldSelectors)(route), nil
  36. }
  37. func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequest, stream RoutingService_SubscribeRoutingStatsServer) error {
  38. if s.routingStats == nil {
  39. return newError("Routing statistics not enabled.")
  40. }
  41. genMessage := AsProtobufMessage(request.FieldSelectors)
  42. subscriber, err := stats.SubscribeRunnableChannel(s.routingStats)
  43. if err != nil {
  44. return err
  45. }
  46. defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber) // nolint: errcheck
  47. for {
  48. select {
  49. case value, received := <-subscriber:
  50. route, ok := value.(routing.Route)
  51. if !(received && ok) {
  52. return newError("Receiving upstream statistics failed.")
  53. }
  54. err := stream.Send(genMessage(route))
  55. if err != nil {
  56. return err
  57. }
  58. case <-stream.Context().Done():
  59. return stream.Context().Err()
  60. }
  61. }
  62. }
  63. func (s *routingServer) mustEmbedUnimplementedRoutingServiceServer() {}
  64. type service struct {
  65. v *core.Instance
  66. }
  67. func (s *service) Register(server *grpc.Server) {
  68. common.Must(s.v.RequireFeatures(func(router routing.Router, stats stats.Manager) {
  69. RegisterRoutingServiceServer(server, NewRoutingServer(router, nil))
  70. }))
  71. }
  72. func init() {
  73. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
  74. s := core.MustFromContext(ctx)
  75. return &service{v: s}, nil
  76. }))
  77. }