Browse Source

API: Implements RoutingService

Vigilans 5 years ago
parent
commit
cd9ac1bac7

+ 90 - 0
app/router/command/command.go

@@ -0,0 +1,90 @@
+// +build !confonly
+
+package command
+
+//go:generate errorgen
+
+import (
+	"context"
+
+	"google.golang.org/grpc"
+
+	"v2ray.com/core"
+	"v2ray.com/core/common"
+	"v2ray.com/core/features/routing"
+	"v2ray.com/core/features/stats"
+)
+
+// routingServer is an implementation of RoutingService.
+type routingServer struct {
+	router       routing.Router
+	routingStats stats.Channel
+}
+
+// NewRoutingServer creates a statistics service with statistics manager.
+func NewRoutingServer(router routing.Router, routingStats stats.Channel) RoutingServiceServer {
+	return &routingServer{
+		router:       router,
+		routingStats: routingStats,
+	}
+}
+
+func (s *routingServer) TestRoute(ctx context.Context, request *TestRouteRequest) (*RoutingContext, error) {
+	if request.RoutingContext == nil {
+		return nil, newError("Invalid routing request.")
+	}
+	route, err := s.router.PickRoute(AsRoutingContext(request.RoutingContext))
+	if err != nil {
+		return nil, err
+	}
+	if request.PublishResult && s.routingStats != nil {
+		s.routingStats.Publish(route)
+	}
+	return AsProtobufMessage(request.FieldSelectors)(route), nil
+}
+
+func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequest, stream RoutingService_SubscribeRoutingStatsServer) error {
+	if s.routingStats == nil {
+		return newError("Routing statistics not enabled.")
+	}
+	genMessage := AsProtobufMessage(request.FieldSelectors)
+	subscriber, err := stats.SubscribeRunnableChannel(s.routingStats)
+	if err != nil {
+		return err
+	}
+	defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber) // nolint: errcheck
+	for {
+		select {
+		case value, received := <-subscriber:
+			route, ok := value.(routing.Route)
+			if !(received && ok) {
+				return newError("Receiving upstream statistics failed.")
+			}
+			err := stream.Send(genMessage(route))
+			if err != nil {
+				return err
+			}
+		case <-stream.Context().Done():
+			return stream.Context().Err()
+		}
+	}
+}
+
+func (s *routingServer) mustEmbedUnimplementedRoutingServiceServer() {}
+
+type service struct {
+	v *core.Instance
+}
+
+func (s *service) Register(server *grpc.Server) {
+	common.Must(s.v.RequireFeatures(func(router routing.Router, stats stats.Manager) {
+		RegisterRoutingServiceServer(server, NewRoutingServer(router, nil))
+	}))
+}
+
+func init() {
+	common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
+		s := core.MustFromContext(ctx)
+		return &service{v: s}, nil
+	}))
+}

+ 525 - 0
app/router/command/command.pb.go

@@ -0,0 +1,525 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.25.0
+// 	protoc        v3.13.0
+// source: app/router/command/command.proto
+
+package command
+
+import (
+	proto "github.com/golang/protobuf/proto"
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+	net "v2ray.com/core/common/net"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+// RoutingContext is the context with information relative to routing process.
+// It conforms to the structure of v2ray.core.features.routing.Context and v2ray.core.features.routing.Route.
+type RoutingContext struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	InboundTag        string            `protobuf:"bytes,1,opt,name=InboundTag,proto3" json:"InboundTag,omitempty"`
+	Network           net.Network       `protobuf:"varint,2,opt,name=Network,proto3,enum=v2ray.core.common.net.Network" json:"Network,omitempty"`
+	SourceIPs         [][]byte          `protobuf:"bytes,3,rep,name=SourceIPs,proto3" json:"SourceIPs,omitempty"`
+	TargetIPs         [][]byte          `protobuf:"bytes,4,rep,name=TargetIPs,proto3" json:"TargetIPs,omitempty"`
+	SourcePort        uint32            `protobuf:"varint,5,opt,name=SourcePort,proto3" json:"SourcePort,omitempty"`
+	TargetPort        uint32            `protobuf:"varint,6,opt,name=TargetPort,proto3" json:"TargetPort,omitempty"`
+	TargetDomain      string            `protobuf:"bytes,7,opt,name=TargetDomain,proto3" json:"TargetDomain,omitempty"`
+	Protocol          string            `protobuf:"bytes,8,opt,name=Protocol,proto3" json:"Protocol,omitempty"`
+	User              string            `protobuf:"bytes,9,opt,name=User,proto3" json:"User,omitempty"`
+	Attributes        map[string]string `protobuf:"bytes,10,rep,name=Attributes,proto3" json:"Attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+	OutboundGroupTags []string          `protobuf:"bytes,11,rep,name=OutboundGroupTags,proto3" json:"OutboundGroupTags,omitempty"`
+	OutboundTag       string            `protobuf:"bytes,12,opt,name=OutboundTag,proto3" json:"OutboundTag,omitempty"`
+}
+
+func (x *RoutingContext) Reset() {
+	*x = RoutingContext{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_app_router_command_command_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *RoutingContext) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*RoutingContext) ProtoMessage() {}
+
+func (x *RoutingContext) ProtoReflect() protoreflect.Message {
+	mi := &file_app_router_command_command_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use RoutingContext.ProtoReflect.Descriptor instead.
+func (*RoutingContext) Descriptor() ([]byte, []int) {
+	return file_app_router_command_command_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *RoutingContext) GetInboundTag() string {
+	if x != nil {
+		return x.InboundTag
+	}
+	return ""
+}
+
+func (x *RoutingContext) GetNetwork() net.Network {
+	if x != nil {
+		return x.Network
+	}
+	return net.Network_Unknown
+}
+
+func (x *RoutingContext) GetSourceIPs() [][]byte {
+	if x != nil {
+		return x.SourceIPs
+	}
+	return nil
+}
+
+func (x *RoutingContext) GetTargetIPs() [][]byte {
+	if x != nil {
+		return x.TargetIPs
+	}
+	return nil
+}
+
+func (x *RoutingContext) GetSourcePort() uint32 {
+	if x != nil {
+		return x.SourcePort
+	}
+	return 0
+}
+
+func (x *RoutingContext) GetTargetPort() uint32 {
+	if x != nil {
+		return x.TargetPort
+	}
+	return 0
+}
+
+func (x *RoutingContext) GetTargetDomain() string {
+	if x != nil {
+		return x.TargetDomain
+	}
+	return ""
+}
+
+func (x *RoutingContext) GetProtocol() string {
+	if x != nil {
+		return x.Protocol
+	}
+	return ""
+}
+
+func (x *RoutingContext) GetUser() string {
+	if x != nil {
+		return x.User
+	}
+	return ""
+}
+
+func (x *RoutingContext) GetAttributes() map[string]string {
+	if x != nil {
+		return x.Attributes
+	}
+	return nil
+}
+
+func (x *RoutingContext) GetOutboundGroupTags() []string {
+	if x != nil {
+		return x.OutboundGroupTags
+	}
+	return nil
+}
+
+func (x *RoutingContext) GetOutboundTag() string {
+	if x != nil {
+		return x.OutboundTag
+	}
+	return ""
+}
+
+// SubscribeRoutingStatsRequest subscribes to routing statistics channel if opened by v2ray-core.
+// * FieldSelectors selects a subset of fields in routing statistics to return. Valid selectors:
+//  - inbound: Selects connection's inbound tag.
+//  - network: Selects connection's network.
+//  - ip: Equivalent as "ip_source" and "ip_target", selects both source and target IP.
+//  - port: Equivalent as "port_source" and "port_target", selects both source and target port.
+//  - domain: Selects target domain.
+//  - protocol: Select connection's protocol.
+//  - user: Select connection's inbound user email.
+//  - attributes: Select connection's additional attributes.
+//  - outbound: Equivalent as "outbound" and "outbound_group", select both outbound tag and outbound group tags.
+// * If FieldSelectors is left empty, all fields will be returned.
+type SubscribeRoutingStatsRequest struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	FieldSelectors []string `protobuf:"bytes,1,rep,name=FieldSelectors,proto3" json:"FieldSelectors,omitempty"`
+}
+
+func (x *SubscribeRoutingStatsRequest) Reset() {
+	*x = SubscribeRoutingStatsRequest{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_app_router_command_command_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *SubscribeRoutingStatsRequest) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SubscribeRoutingStatsRequest) ProtoMessage() {}
+
+func (x *SubscribeRoutingStatsRequest) ProtoReflect() protoreflect.Message {
+	mi := &file_app_router_command_command_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use SubscribeRoutingStatsRequest.ProtoReflect.Descriptor instead.
+func (*SubscribeRoutingStatsRequest) Descriptor() ([]byte, []int) {
+	return file_app_router_command_command_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *SubscribeRoutingStatsRequest) GetFieldSelectors() []string {
+	if x != nil {
+		return x.FieldSelectors
+	}
+	return nil
+}
+
+// TestRouteRequest manually tests a routing result according to the routing context message.
+// * RoutingContext is the routing message without outbound information.
+// * FieldSelectors selects the fields to return in the routing result. All fields are returned if left empty.
+// * PublishResult broadcasts the routing result to routing statistics channel if set true.
+type TestRouteRequest struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	RoutingContext *RoutingContext `protobuf:"bytes,1,opt,name=RoutingContext,proto3" json:"RoutingContext,omitempty"`
+	FieldSelectors []string        `protobuf:"bytes,2,rep,name=FieldSelectors,proto3" json:"FieldSelectors,omitempty"`
+	PublishResult  bool            `protobuf:"varint,3,opt,name=PublishResult,proto3" json:"PublishResult,omitempty"`
+}
+
+func (x *TestRouteRequest) Reset() {
+	*x = TestRouteRequest{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_app_router_command_command_proto_msgTypes[2]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *TestRouteRequest) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TestRouteRequest) ProtoMessage() {}
+
+func (x *TestRouteRequest) ProtoReflect() protoreflect.Message {
+	mi := &file_app_router_command_command_proto_msgTypes[2]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use TestRouteRequest.ProtoReflect.Descriptor instead.
+func (*TestRouteRequest) Descriptor() ([]byte, []int) {
+	return file_app_router_command_command_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *TestRouteRequest) GetRoutingContext() *RoutingContext {
+	if x != nil {
+		return x.RoutingContext
+	}
+	return nil
+}
+
+func (x *TestRouteRequest) GetFieldSelectors() []string {
+	if x != nil {
+		return x.FieldSelectors
+	}
+	return nil
+}
+
+func (x *TestRouteRequest) GetPublishResult() bool {
+	if x != nil {
+		return x.PublishResult
+	}
+	return false
+}
+
+type Config struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+}
+
+func (x *Config) Reset() {
+	*x = Config{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_app_router_command_command_proto_msgTypes[3]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Config) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Config) ProtoMessage() {}
+
+func (x *Config) ProtoReflect() protoreflect.Message {
+	mi := &file_app_router_command_command_proto_msgTypes[3]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Config.ProtoReflect.Descriptor instead.
+func (*Config) Descriptor() ([]byte, []int) {
+	return file_app_router_command_command_proto_rawDescGZIP(), []int{3}
+}
+
+var File_app_router_command_command_proto protoreflect.FileDescriptor
+
+var file_app_router_command_command_proto_rawDesc = []byte{
+	0x0a, 0x20, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6d,
+	0x6d, 0x61, 0x6e, 0x64, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x12, 0x1d, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61,
+	0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
+	0x64, 0x1a, 0x18, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x6e, 0x65, 0x74, 0x2f, 0x6e, 0x65,
+	0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa8, 0x04, 0x0a, 0x0e,
+	0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1e,
+	0x0a, 0x0a, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x0a, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x12, 0x38,
+	0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32,
+	0x1e, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x63, 0x6f, 0x6d,
+	0x6d, 0x6f, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2e, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x52,
+	0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x6f, 0x75, 0x72,
+	0x63, 0x65, 0x49, 0x50, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x09, 0x53, 0x6f, 0x75,
+	0x72, 0x63, 0x65, 0x49, 0x50, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74,
+	0x49, 0x50, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x09, 0x54, 0x61, 0x72, 0x67, 0x65,
+	0x74, 0x49, 0x50, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x6f,
+	0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65,
+	0x50, 0x6f, 0x72, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x50, 0x6f,
+	0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74,
+	0x50, 0x6f, 0x72, 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x44, 0x6f,
+	0x6d, 0x61, 0x69, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x54, 0x61, 0x72, 0x67,
+	0x65, 0x74, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x74,
+	0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x50, 0x72, 0x6f, 0x74,
+	0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x18, 0x09, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x5d, 0x0a, 0x0a, 0x41, 0x74, 0x74, 0x72,
+	0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x76,
+	0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f,
+	0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75,
+	0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x2e, 0x41, 0x74, 0x74, 0x72,
+	0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x41, 0x74, 0x74,
+	0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x11, 0x4f, 0x75, 0x74, 0x62, 0x6f,
+	0x75, 0x6e, 0x64, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x54, 0x61, 0x67, 0x73, 0x18, 0x0b, 0x20, 0x03,
+	0x28, 0x09, 0x52, 0x11, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x47, 0x72, 0x6f, 0x75,
+	0x70, 0x54, 0x61, 0x67, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e,
+	0x64, 0x54, 0x61, 0x67, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x4f, 0x75, 0x74, 0x62,
+	0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x1a, 0x3d, 0x0a, 0x0f, 0x41, 0x74, 0x74, 0x72, 0x69,
+	0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
+	0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
+	0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
+	0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x46, 0x0a, 0x1c, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52,
+	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x53,
+	0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e,
+	0x46, 0x69, 0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x22, 0xb7,
+	0x01, 0x0a, 0x10, 0x54, 0x65, 0x73, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75,
+	0x65, 0x73, 0x74, 0x12, 0x55, 0x0a, 0x0e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f,
+	0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x76, 0x32,
+	0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75,
+	0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75, 0x74,
+	0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, 0x52, 0x6f, 0x75, 0x74,
+	0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x46, 0x69,
+	0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03,
+	0x28, 0x09, 0x52, 0x0e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f,
+	0x72, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73,
+	0x75, 0x6c, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69,
+	0x73, 0x68, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x08, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66,
+	0x69, 0x67, 0x32, 0x89, 0x02, 0x0a, 0x0e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x65,
+	0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12,
+	0x3b, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70,
+	0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
+	0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67,
+	0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x76,
+	0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f,
+	0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75,
+	0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12,
+	0x6d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x12, 0x2f, 0x2e, 0x76,
+	0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f,
+	0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x65, 0x73,
+	0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e,
+	0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72,
+	0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f,
+	0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x00, 0x42, 0x68,
+	0x0a, 0x21, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65,
+	0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d,
+	0x61, 0x6e, 0x64, 0x50, 0x01, 0x5a, 0x21, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d,
+	0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72,
+	0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0xaa, 0x02, 0x1d, 0x56, 0x32, 0x52, 0x61, 0x79,
+	0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72,
+	0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_app_router_command_command_proto_rawDescOnce sync.Once
+	file_app_router_command_command_proto_rawDescData = file_app_router_command_command_proto_rawDesc
+)
+
+func file_app_router_command_command_proto_rawDescGZIP() []byte {
+	file_app_router_command_command_proto_rawDescOnce.Do(func() {
+		file_app_router_command_command_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_router_command_command_proto_rawDescData)
+	})
+	return file_app_router_command_command_proto_rawDescData
+}
+
+var file_app_router_command_command_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_app_router_command_command_proto_goTypes = []interface{}{
+	(*RoutingContext)(nil),               // 0: v2ray.core.app.router.command.RoutingContext
+	(*SubscribeRoutingStatsRequest)(nil), // 1: v2ray.core.app.router.command.SubscribeRoutingStatsRequest
+	(*TestRouteRequest)(nil),             // 2: v2ray.core.app.router.command.TestRouteRequest
+	(*Config)(nil),                       // 3: v2ray.core.app.router.command.Config
+	nil,                                  // 4: v2ray.core.app.router.command.RoutingContext.AttributesEntry
+	(net.Network)(0),                     // 5: v2ray.core.common.net.Network
+}
+var file_app_router_command_command_proto_depIdxs = []int32{
+	5, // 0: v2ray.core.app.router.command.RoutingContext.Network:type_name -> v2ray.core.common.net.Network
+	4, // 1: v2ray.core.app.router.command.RoutingContext.Attributes:type_name -> v2ray.core.app.router.command.RoutingContext.AttributesEntry
+	0, // 2: v2ray.core.app.router.command.TestRouteRequest.RoutingContext:type_name -> v2ray.core.app.router.command.RoutingContext
+	1, // 3: v2ray.core.app.router.command.RoutingService.SubscribeRoutingStats:input_type -> v2ray.core.app.router.command.SubscribeRoutingStatsRequest
+	2, // 4: v2ray.core.app.router.command.RoutingService.TestRoute:input_type -> v2ray.core.app.router.command.TestRouteRequest
+	0, // 5: v2ray.core.app.router.command.RoutingService.SubscribeRoutingStats:output_type -> v2ray.core.app.router.command.RoutingContext
+	0, // 6: v2ray.core.app.router.command.RoutingService.TestRoute:output_type -> v2ray.core.app.router.command.RoutingContext
+	5, // [5:7] is the sub-list for method output_type
+	3, // [3:5] is the sub-list for method input_type
+	3, // [3:3] is the sub-list for extension type_name
+	3, // [3:3] is the sub-list for extension extendee
+	0, // [0:3] is the sub-list for field type_name
+}
+
+func init() { file_app_router_command_command_proto_init() }
+func file_app_router_command_command_proto_init() {
+	if File_app_router_command_command_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_app_router_command_command_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*RoutingContext); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_app_router_command_command_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*SubscribeRoutingStatsRequest); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_app_router_command_command_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*TestRouteRequest); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_app_router_command_command_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Config); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_app_router_command_command_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   5,
+			NumExtensions: 0,
+			NumServices:   1,
+		},
+		GoTypes:           file_app_router_command_command_proto_goTypes,
+		DependencyIndexes: file_app_router_command_command_proto_depIdxs,
+		MessageInfos:      file_app_router_command_command_proto_msgTypes,
+	}.Build()
+	File_app_router_command_command_proto = out.File
+	file_app_router_command_command_proto_rawDesc = nil
+	file_app_router_command_command_proto_goTypes = nil
+	file_app_router_command_command_proto_depIdxs = nil
+}

+ 59 - 0
app/router/command/command.proto

@@ -0,0 +1,59 @@
+syntax = "proto3";
+
+package v2ray.core.app.router.command;
+option csharp_namespace = "V2Ray.Core.App.Router.Command";
+option go_package = "v2ray.com/core/app/router/command";
+option java_package = "com.v2ray.core.app.router.command";
+option java_multiple_files = true;
+
+import "common/net/network.proto";
+
+// RoutingContext is the context with information relative to routing process.
+// It conforms to the structure of v2ray.core.features.routing.Context and v2ray.core.features.routing.Route.
+message RoutingContext {
+  string InboundTag = 1;
+  v2ray.core.common.net.Network Network = 2;
+  repeated bytes SourceIPs = 3;
+  repeated bytes TargetIPs = 4;
+  uint32 SourcePort = 5;
+  uint32 TargetPort = 6;
+  string TargetDomain = 7;
+  string Protocol = 8;
+  string User = 9;
+  map<string, string> Attributes = 10;
+  repeated string OutboundGroupTags = 11;
+  string OutboundTag = 12;
+}
+
+// SubscribeRoutingStatsRequest subscribes to routing statistics channel if opened by v2ray-core.
+// * FieldSelectors selects a subset of fields in routing statistics to return. Valid selectors:
+//  - inbound: Selects connection's inbound tag.
+//  - network: Selects connection's network.
+//  - ip: Equivalent as "ip_source" and "ip_target", selects both source and target IP.
+//  - port: Equivalent as "port_source" and "port_target", selects both source and target port.
+//  - domain: Selects target domain.
+//  - protocol: Select connection's protocol.
+//  - user: Select connection's inbound user email.
+//  - attributes: Select connection's additional attributes.
+//  - outbound: Equivalent as "outbound" and "outbound_group", select both outbound tag and outbound group tags.
+// * If FieldSelectors is left empty, all fields will be returned.
+message SubscribeRoutingStatsRequest {
+  repeated string FieldSelectors = 1;
+}
+
+// TestRouteRequest manually tests a routing result according to the routing context message.
+// * RoutingContext is the routing message without outbound information.
+// * FieldSelectors selects the fields to return in the routing result. All fields are returned if left empty.
+// * PublishResult broadcasts the routing result to routing statistics channel if set true.
+message TestRouteRequest {
+  RoutingContext RoutingContext = 1;
+  repeated string FieldSelectors = 2;
+  bool PublishResult = 3;
+}
+
+service RoutingService {
+  rpc SubscribeRoutingStats(SubscribeRoutingStatsRequest) returns (stream RoutingContext) {}
+  rpc TestRoute(TestRouteRequest) returns (RoutingContext) {}
+}
+
+message Config {}

+ 154 - 0
app/router/command/command_grpc.pb.go

@@ -0,0 +1,154 @@
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+
+package command
+
+import (
+	context "context"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// RoutingServiceClient is the client API for RoutingService service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type RoutingServiceClient interface {
+	SubscribeRoutingStats(ctx context.Context, in *SubscribeRoutingStatsRequest, opts ...grpc.CallOption) (RoutingService_SubscribeRoutingStatsClient, error)
+	TestRoute(ctx context.Context, in *TestRouteRequest, opts ...grpc.CallOption) (*RoutingContext, error)
+}
+
+type routingServiceClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewRoutingServiceClient(cc grpc.ClientConnInterface) RoutingServiceClient {
+	return &routingServiceClient{cc}
+}
+
+func (c *routingServiceClient) SubscribeRoutingStats(ctx context.Context, in *SubscribeRoutingStatsRequest, opts ...grpc.CallOption) (RoutingService_SubscribeRoutingStatsClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_RoutingService_serviceDesc.Streams[0], "/v2ray.core.app.router.command.RoutingService/SubscribeRoutingStats", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &routingServiceSubscribeRoutingStatsClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type RoutingService_SubscribeRoutingStatsClient interface {
+	Recv() (*RoutingContext, error)
+	grpc.ClientStream
+}
+
+type routingServiceSubscribeRoutingStatsClient struct {
+	grpc.ClientStream
+}
+
+func (x *routingServiceSubscribeRoutingStatsClient) Recv() (*RoutingContext, error) {
+	m := new(RoutingContext)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func (c *routingServiceClient) TestRoute(ctx context.Context, in *TestRouteRequest, opts ...grpc.CallOption) (*RoutingContext, error) {
+	out := new(RoutingContext)
+	err := c.cc.Invoke(ctx, "/v2ray.core.app.router.command.RoutingService/TestRoute", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// RoutingServiceServer is the server API for RoutingService service.
+// All implementations must embed UnimplementedRoutingServiceServer
+// for forward compatibility
+type RoutingServiceServer interface {
+	SubscribeRoutingStats(*SubscribeRoutingStatsRequest, RoutingService_SubscribeRoutingStatsServer) error
+	TestRoute(context.Context, *TestRouteRequest) (*RoutingContext, error)
+	mustEmbedUnimplementedRoutingServiceServer()
+}
+
+// UnimplementedRoutingServiceServer must be embedded to have forward compatible implementations.
+type UnimplementedRoutingServiceServer struct {
+}
+
+func (*UnimplementedRoutingServiceServer) SubscribeRoutingStats(*SubscribeRoutingStatsRequest, RoutingService_SubscribeRoutingStatsServer) error {
+	return status.Errorf(codes.Unimplemented, "method SubscribeRoutingStats not implemented")
+}
+func (*UnimplementedRoutingServiceServer) TestRoute(context.Context, *TestRouteRequest) (*RoutingContext, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method TestRoute not implemented")
+}
+func (*UnimplementedRoutingServiceServer) mustEmbedUnimplementedRoutingServiceServer() {}
+
+func RegisterRoutingServiceServer(s *grpc.Server, srv RoutingServiceServer) {
+	s.RegisterService(&_RoutingService_serviceDesc, srv)
+}
+
+func _RoutingService_SubscribeRoutingStats_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(SubscribeRoutingStatsRequest)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(RoutingServiceServer).SubscribeRoutingStats(m, &routingServiceSubscribeRoutingStatsServer{stream})
+}
+
+type RoutingService_SubscribeRoutingStatsServer interface {
+	Send(*RoutingContext) error
+	grpc.ServerStream
+}
+
+type routingServiceSubscribeRoutingStatsServer struct {
+	grpc.ServerStream
+}
+
+func (x *routingServiceSubscribeRoutingStatsServer) Send(m *RoutingContext) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func _RoutingService_TestRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(TestRouteRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(RoutingServiceServer).TestRoute(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/v2ray.core.app.router.command.RoutingService/TestRoute",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(RoutingServiceServer).TestRoute(ctx, req.(*TestRouteRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _RoutingService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "v2ray.core.app.router.command.RoutingService",
+	HandlerType: (*RoutingServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "TestRoute",
+			Handler:    _RoutingService_TestRoute_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "SubscribeRoutingStats",
+			Handler:       _RoutingService_SubscribeRoutingStats_Handler,
+			ServerStreams: true,
+		},
+	},
+	Metadata: "app/router/command/command.proto",
+}

+ 334 - 0
app/router/command/command_test.go

@@ -0,0 +1,334 @@
+package command_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/golang/mock/gomock"
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+	"v2ray.com/core/app/router"
+	. "v2ray.com/core/app/router/command"
+	"v2ray.com/core/app/stats"
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/features/routing"
+	"v2ray.com/core/testing/mocks"
+)
+
+func TestServiceSubscribeRoutingStats(t *testing.T) {
+	c := stats.NewChannel(&stats.ChannelConfig{
+		SubscriberLimit:  1,
+		BufferSize:       16,
+		BroadcastTimeout: 100,
+	})
+	common.Must(c.Start())
+	defer c.Close()
+
+	lis := bufconn.Listen(1024 * 1024)
+	bufDialer := func(context.Context, string) (net.Conn, error) {
+		return lis.Dial()
+	}
+
+	testCases := []*RoutingContext{
+		{InboundTag: "in", OutboundTag: "out"},
+		{TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
+		{TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
+		{SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
+		{Network: net.Network_UDP, OutboundGroupTags: []string{"outergroup", "innergroup"}, OutboundTag: "out"},
+		{Protocol: "bittorrent", OutboundTag: "blocked"},
+		{User: "example@v2fly.org", OutboundTag: "out"},
+		{SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
+	}
+	errCh := make(chan error)
+	nextPub := make(chan struct{})
+
+	// Server goroutine
+	go func() {
+		server := grpc.NewServer()
+		RegisterRoutingServiceServer(server, NewRoutingServer(nil, c))
+		errCh <- server.Serve(lis)
+	}()
+
+	// Publisher goroutine
+	go func() {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+		defer cancel()
+		for { // Wait until there's one subscriber in routing stats channel
+			if len(c.Subscribers()) > 0 {
+				break
+			}
+			if ctx.Err() != nil {
+				errCh <- ctx.Err()
+			}
+		}
+		for _, tc := range testCases {
+			c.Publish(AsRoutingRoute(tc))
+		}
+
+		// Wait for next round of publishing
+		<-nextPub
+
+		ctx, cancel = context.WithTimeout(context.Background(), time.Second)
+		defer cancel()
+		for { // Wait until there's one subscriber in routing stats channel
+			if len(c.Subscribers()) > 0 {
+				break
+			}
+			if ctx.Err() != nil {
+				errCh <- ctx.Err()
+			}
+		}
+		for _, tc := range testCases {
+			c.Publish(AsRoutingRoute(tc))
+		}
+	}()
+
+	// Client goroutine
+	go func() {
+		conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
+		if err != nil {
+			errCh <- err
+		}
+		defer lis.Close()
+		defer conn.Close()
+		client := NewRoutingServiceClient(conn)
+
+		// Test retrieving all fields
+		streamCtx, streamClose := context.WithCancel(context.Background())
+		stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{})
+		if err != nil {
+			errCh <- err
+		}
+
+		for _, tc := range testCases {
+			msg, err := stream.Recv()
+			if err != nil {
+				errCh <- err
+			}
+			if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
+				t.Error(r)
+			}
+		}
+
+		// Test that double subscription will fail
+		errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{
+			FieldSelectors: []string{"ip", "port", "domain", "outbound"},
+		})
+		if err != nil {
+			errCh <- err
+		}
+		if _, err := errStream.Recv(); err == nil {
+			t.Error("unexpected successful subscription")
+		}
+
+		// Test the unsubscription of stream works well
+		streamClose()
+		timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second)
+		defer timeout()
+		for { // Wait until there's no subscriber in routing stats channel
+			if len(c.Subscribers()) == 0 {
+				break
+			}
+			if timeOutCtx.Err() != nil {
+				t.Error("unexpected subscribers not decreased in channel")
+				errCh <- timeOutCtx.Err()
+			}
+		}
+
+		// Test retrieving only a subset of fields
+		streamCtx, streamClose = context.WithCancel(context.Background())
+		stream, err = client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{
+			FieldSelectors: []string{"ip", "port", "domain", "outbound"},
+		})
+		if err != nil {
+			errCh <- err
+		}
+
+		close(nextPub) // Send nextPub signal to start next round of publishing
+		for _, tc := range testCases {
+			msg, err := stream.Recv()
+			stat := &RoutingContext{ // Only a subset of stats is retrieved
+				SourceIPs:         tc.SourceIPs,
+				TargetIPs:         tc.TargetIPs,
+				SourcePort:        tc.SourcePort,
+				TargetPort:        tc.TargetPort,
+				TargetDomain:      tc.TargetDomain,
+				OutboundGroupTags: tc.OutboundGroupTags,
+				OutboundTag:       tc.OutboundTag,
+			}
+			if err != nil {
+				errCh <- err
+			}
+			if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
+				t.Error(r)
+			}
+		}
+		streamClose()
+
+		// Client passed all tests successfully
+		errCh <- nil
+	}()
+
+	// Wait for goroutines to complete
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case err := <-errCh:
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+}
+
+func TestSerivceTestRoute(t *testing.T) {
+	c := stats.NewChannel(&stats.ChannelConfig{
+		SubscriberLimit:  1,
+		BufferSize:       16,
+		BroadcastTimeout: 100,
+	})
+	common.Must(c.Start())
+	defer c.Close()
+
+	r := new(router.Router)
+	mockCtl := gomock.NewController(t)
+	defer mockCtl.Finish()
+	common.Must(r.Init(&router.Config{
+		Rule: []*router.RoutingRule{
+			{
+				InboundTag: []string{"in"},
+				TargetTag:  &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				Protocol:  []string{"bittorrent"},
+				TargetTag: &router.RoutingRule_Tag{Tag: "blocked"},
+			},
+			{
+				PortList:  &net.PortList{Range: []*net.PortRange{{From: 8080, To: 8080}}},
+				TargetTag: &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				SourcePortList: &net.PortList{Range: []*net.PortRange{{From: 9999, To: 9999}}},
+				TargetTag:      &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				Domain:    []*router.Domain{{Type: router.Domain_Domain, Value: "com"}},
+				TargetTag: &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				SourceGeoip: []*router.GeoIP{{CountryCode: "private", Cidr: []*router.CIDR{{Ip: []byte{127, 0, 0, 0}, Prefix: 8}}}},
+				TargetTag:   &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				UserEmail: []string{"example@v2fly.org"},
+				TargetTag: &router.RoutingRule_Tag{Tag: "out"},
+			},
+			{
+				Networks:  []net.Network{net.Network_UDP, net.Network_TCP},
+				TargetTag: &router.RoutingRule_Tag{Tag: "out"},
+			},
+		},
+	}, mocks.NewDNSClient(mockCtl), mocks.NewOutboundManager(mockCtl)))
+
+	lis := bufconn.Listen(1024 * 1024)
+	bufDialer := func(context.Context, string) (net.Conn, error) {
+		return lis.Dial()
+	}
+
+	errCh := make(chan error)
+
+	// Server goroutine
+	go func() {
+		server := grpc.NewServer()
+		RegisterRoutingServiceServer(server, NewRoutingServer(r, c))
+		errCh <- server.Serve(lis)
+	}()
+
+	// Client goroutine
+	go func() {
+		conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
+		if err != nil {
+			errCh <- err
+		}
+		defer lis.Close()
+		defer conn.Close()
+		client := NewRoutingServiceClient(conn)
+
+		testCases := []*RoutingContext{
+			{InboundTag: "in", OutboundTag: "out"},
+			{TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
+			{TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
+			{SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
+			{Network: net.Network_UDP, Protocol: "bittorrent", OutboundTag: "blocked"},
+			{User: "example@v2fly.org", OutboundTag: "out"},
+			{SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
+		}
+
+		// Test simple TestRoute
+		for _, tc := range testCases {
+			route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc})
+			if err != nil {
+				errCh <- err
+			}
+			if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
+				t.Error(r)
+			}
+		}
+
+		// Test TestRoute with special options
+		sub, err := c.Subscribe()
+		if err != nil {
+			errCh <- err
+		}
+		for _, tc := range testCases {
+			route, err := client.TestRoute(context.Background(), &TestRouteRequest{
+				RoutingContext: tc,
+				FieldSelectors: []string{"ip", "port", "domain", "outbound"},
+				PublishResult:  true,
+			})
+			stat := &RoutingContext{ // Only a subset of stats is retrieved
+				SourceIPs:         tc.SourceIPs,
+				TargetIPs:         tc.TargetIPs,
+				SourcePort:        tc.SourcePort,
+				TargetPort:        tc.TargetPort,
+				TargetDomain:      tc.TargetDomain,
+				OutboundGroupTags: tc.OutboundGroupTags,
+				OutboundTag:       tc.OutboundTag,
+			}
+			if err != nil {
+				errCh <- err
+			}
+			if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
+				t.Error(r)
+			}
+			select { // Check that routing result has been published to statistics channel
+			case msg, received := <-sub:
+				if route, ok := msg.(routing.Route); received && ok {
+					if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
+						t.Error(r)
+					}
+				} else {
+					t.Error("unexpected failure in receiving published routing result")
+				}
+			case <-time.After(100 * time.Millisecond):
+				t.Error("unexpected failure in receiving published routing result")
+			}
+		}
+
+		// Client passed all tests successfully
+		errCh <- nil
+	}()
+
+	// Wait for goroutines to complete
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case err := <-errCh:
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+}

+ 94 - 0
app/router/command/config.go

@@ -0,0 +1,94 @@
+package command
+
+import (
+	"strings"
+
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/features/routing"
+)
+
+// routingContext is an wrapper of protobuf RoutingContext as implementation of routing.Context and routing.Route.
+type routingContext struct {
+	*RoutingContext
+}
+
+func (c routingContext) GetSourceIPs() []net.IP {
+	return mapBytesToIPs(c.RoutingContext.GetSourceIPs())
+}
+
+func (c routingContext) GetSourcePort() net.Port {
+	return net.Port(c.RoutingContext.GetSourcePort())
+}
+
+func (c routingContext) GetTargetIPs() []net.IP {
+	return mapBytesToIPs(c.RoutingContext.GetTargetIPs())
+}
+
+func (c routingContext) GetTargetPort() net.Port {
+	return net.Port(c.RoutingContext.GetTargetPort())
+}
+
+// AsRoutingContext converts a protobuf RoutingContext into an implementation of routing.Context.
+func AsRoutingContext(r *RoutingContext) routing.Context {
+	return routingContext{r}
+}
+
+// AsRoutingRoute converts a protobuf RoutingContext into an implementation of routing.Route.
+func AsRoutingRoute(r *RoutingContext) routing.Route {
+	return routingContext{r}
+}
+
+var fieldMap = map[string]func(*RoutingContext, routing.Route){
+	"inbound":        func(s *RoutingContext, r routing.Route) { s.InboundTag = r.GetInboundTag() },
+	"network":        func(s *RoutingContext, r routing.Route) { s.Network = r.GetNetwork() },
+	"ip_source":      func(s *RoutingContext, r routing.Route) { s.SourceIPs = mapIPsToBytes(r.GetSourceIPs()) },
+	"ip_target":      func(s *RoutingContext, r routing.Route) { s.TargetIPs = mapIPsToBytes(r.GetTargetIPs()) },
+	"port_source":    func(s *RoutingContext, r routing.Route) { s.SourcePort = uint32(r.GetSourcePort()) },
+	"port_target":    func(s *RoutingContext, r routing.Route) { s.TargetPort = uint32(r.GetTargetPort()) },
+	"domain":         func(s *RoutingContext, r routing.Route) { s.TargetDomain = r.GetTargetDomain() },
+	"protocol":       func(s *RoutingContext, r routing.Route) { s.Protocol = r.GetProtocol() },
+	"user":           func(s *RoutingContext, r routing.Route) { s.User = r.GetUser() },
+	"attributes":     func(s *RoutingContext, r routing.Route) { s.Attributes = r.GetAttributes() },
+	"outbound_group": func(s *RoutingContext, r routing.Route) { s.OutboundGroupTags = r.GetOutboundGroupTags() },
+	"outbound":       func(s *RoutingContext, r routing.Route) { s.OutboundTag = r.GetOutboundTag() },
+}
+
+// AsProtobufMessage takes selectors of fields and returns a function to convert routing.Route to protobuf RoutingContext.
+func AsProtobufMessage(fieldSelectors []string) func(routing.Route) *RoutingContext {
+	initializers := []func(*RoutingContext, routing.Route){}
+	for field, init := range fieldMap {
+		if len(fieldSelectors) == 0 { // If selectors not set, retrieve all fields
+			initializers = append(initializers, init)
+			continue
+		}
+		for _, selector := range fieldSelectors {
+			if strings.HasPrefix(field, selector) {
+				initializers = append(initializers, init)
+				break
+			}
+		}
+	}
+	return func(ctx routing.Route) *RoutingContext {
+		message := new(RoutingContext)
+		for _, init := range initializers {
+			init(message, ctx)
+		}
+		return message
+	}
+}
+
+func mapBytesToIPs(bytes [][]byte) []net.IP {
+	var ips []net.IP
+	for _, rawIP := range bytes {
+		ips = append(ips, net.IP(rawIP))
+	}
+	return ips
+}
+
+func mapIPsToBytes(ips []net.IP) [][]byte {
+	var bytes [][]byte
+	for _, ip := range ips {
+		bytes = append(bytes, []byte(ip))
+	}
+	return bytes
+}

+ 9 - 0
app/router/command/errors.generated.go

@@ -0,0 +1,9 @@
+package command
+
+import "v2ray.com/core/common/errors"
+
+type errPathObjHolder struct{}
+
+func newError(values ...interface{}) *errors.Error {
+	return errors.New(values...).WithPathObj(errPathObjHolder{})
+}