command_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package command_test
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/golang/mock/gomock"
  7. "github.com/google/go-cmp/cmp"
  8. "github.com/google/go-cmp/cmp/cmpopts"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "google.golang.org/grpc/test/bufconn"
  12. "github.com/v2fly/v2ray-core/v5/app/router"
  13. . "github.com/v2fly/v2ray-core/v5/app/router/command"
  14. "github.com/v2fly/v2ray-core/v5/app/router/routercommon"
  15. "github.com/v2fly/v2ray-core/v5/app/stats"
  16. "github.com/v2fly/v2ray-core/v5/common"
  17. "github.com/v2fly/v2ray-core/v5/common/net"
  18. "github.com/v2fly/v2ray-core/v5/features/routing"
  19. "github.com/v2fly/v2ray-core/v5/testing/mocks"
  20. )
  21. func TestServiceSubscribeRoutingStats(t *testing.T) {
  22. c := stats.NewChannel(&stats.ChannelConfig{
  23. SubscriberLimit: 1,
  24. BufferSize: 0,
  25. Blocking: true,
  26. })
  27. common.Must(c.Start())
  28. defer c.Close()
  29. lis := bufconn.Listen(1024 * 1024)
  30. bufDialer := func(context.Context, string) (net.Conn, error) {
  31. return lis.Dial()
  32. }
  33. testCases := []*RoutingContext{
  34. {InboundTag: "in", OutboundTag: "out"},
  35. {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
  36. {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
  37. {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
  38. {Network: net.Network_UDP, OutboundGroupTags: []string{"outergroup", "innergroup"}, OutboundTag: "out"},
  39. {Protocol: "bittorrent", OutboundTag: "blocked"},
  40. {User: "example@v2fly.org", OutboundTag: "out"},
  41. {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
  42. }
  43. errCh := make(chan error)
  44. nextPub := make(chan struct{})
  45. // Server goroutine
  46. go func() {
  47. server := grpc.NewServer()
  48. RegisterRoutingServiceServer(server, NewRoutingServer(nil, c))
  49. errCh <- server.Serve(lis)
  50. }()
  51. // Publisher goroutine
  52. go func() {
  53. publishTestCases := func() error {
  54. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  55. defer cancel()
  56. for { // Wait until there's one subscriber in routing stats channel
  57. if len(c.Subscribers()) > 0 {
  58. break
  59. }
  60. if ctx.Err() != nil {
  61. return ctx.Err()
  62. }
  63. }
  64. for _, tc := range testCases {
  65. c.Publish(context.Background(), AsRoutingRoute(tc))
  66. time.Sleep(time.Millisecond)
  67. }
  68. return nil
  69. }
  70. if err := publishTestCases(); err != nil {
  71. errCh <- err
  72. }
  73. // Wait for next round of publishing
  74. <-nextPub
  75. if err := publishTestCases(); err != nil {
  76. errCh <- err
  77. }
  78. }()
  79. // Client goroutine
  80. go func() {
  81. defer lis.Close()
  82. conn, err := grpc.DialContext(
  83. context.Background(),
  84. "bufnet",
  85. grpc.WithContextDialer(bufDialer),
  86. grpc.WithTransportCredentials(insecure.NewCredentials()),
  87. )
  88. if err != nil {
  89. errCh <- err
  90. return
  91. }
  92. defer conn.Close()
  93. client := NewRoutingServiceClient(conn)
  94. // Test retrieving all fields
  95. testRetrievingAllFields := func() error {
  96. streamCtx, streamClose := context.WithCancel(context.Background())
  97. // Test the unsubscription of stream works well
  98. defer func() {
  99. streamClose()
  100. timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second)
  101. defer timeout()
  102. for { // Wait until there's no subscriber in routing stats channel
  103. if len(c.Subscribers()) == 0 {
  104. break
  105. }
  106. if timeOutCtx.Err() != nil {
  107. t.Error("unexpected subscribers not decreased in channel", timeOutCtx.Err())
  108. }
  109. }
  110. }()
  111. stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{})
  112. if err != nil {
  113. return err
  114. }
  115. for _, tc := range testCases {
  116. msg, err := stream.Recv()
  117. if err != nil {
  118. return err
  119. }
  120. if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  121. t.Error(r)
  122. }
  123. }
  124. // Test that double subscription will fail
  125. errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{
  126. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  127. })
  128. if err != nil {
  129. return err
  130. }
  131. if _, err := errStream.Recv(); err == nil {
  132. t.Error("unexpected successful subscription")
  133. }
  134. return nil
  135. }
  136. // Test retrieving only a subset of fields
  137. testRetrievingSubsetOfFields := func() error {
  138. streamCtx, streamClose := context.WithCancel(context.Background())
  139. defer streamClose()
  140. stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{
  141. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  142. })
  143. if err != nil {
  144. return err
  145. }
  146. // Send nextPub signal to start next round of publishing
  147. close(nextPub)
  148. for _, tc := range testCases {
  149. msg, err := stream.Recv()
  150. if err != nil {
  151. return err
  152. }
  153. stat := &RoutingContext{ // Only a subset of stats is retrieved
  154. SourceIPs: tc.SourceIPs,
  155. TargetIPs: tc.TargetIPs,
  156. SourcePort: tc.SourcePort,
  157. TargetPort: tc.TargetPort,
  158. TargetDomain: tc.TargetDomain,
  159. OutboundGroupTags: tc.OutboundGroupTags,
  160. OutboundTag: tc.OutboundTag,
  161. }
  162. if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  163. t.Error(r)
  164. }
  165. }
  166. return nil
  167. }
  168. if err := testRetrievingAllFields(); err != nil {
  169. errCh <- err
  170. }
  171. if err := testRetrievingSubsetOfFields(); err != nil {
  172. errCh <- err
  173. }
  174. errCh <- nil // Client passed all tests successfully
  175. }()
  176. // Wait for goroutines to complete
  177. select {
  178. case <-time.After(2 * time.Second):
  179. t.Fatal("Test timeout after 2s")
  180. case err := <-errCh:
  181. if err != nil {
  182. t.Fatal(err)
  183. }
  184. }
  185. }
  186. func TestSerivceTestRoute(t *testing.T) {
  187. c := stats.NewChannel(&stats.ChannelConfig{
  188. SubscriberLimit: 1,
  189. BufferSize: 16,
  190. Blocking: true,
  191. })
  192. common.Must(c.Start())
  193. defer c.Close()
  194. r := new(router.Router)
  195. mockCtl := gomock.NewController(t)
  196. defer mockCtl.Finish()
  197. common.Must(r.Init(context.TODO(), &router.Config{
  198. Rule: []*router.RoutingRule{
  199. {
  200. InboundTag: []string{"in"},
  201. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  202. },
  203. {
  204. Protocol: []string{"bittorrent"},
  205. TargetTag: &router.RoutingRule_Tag{Tag: "blocked"},
  206. },
  207. {
  208. PortList: &net.PortList{Range: []*net.PortRange{{From: 8080, To: 8080}}},
  209. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  210. },
  211. {
  212. SourcePortList: &net.PortList{Range: []*net.PortRange{{From: 9999, To: 9999}}},
  213. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  214. },
  215. {
  216. Domain: []*routercommon.Domain{{Type: routercommon.Domain_RootDomain, Value: "com"}},
  217. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  218. },
  219. {
  220. SourceGeoip: []*routercommon.GeoIP{{CountryCode: "private", Cidr: []*routercommon.CIDR{{Ip: []byte{127, 0, 0, 0}, Prefix: 8}}}},
  221. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  222. },
  223. {
  224. UserEmail: []string{"example@v2fly.org"},
  225. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  226. },
  227. {
  228. Networks: []net.Network{net.Network_UDP, net.Network_TCP},
  229. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  230. },
  231. },
  232. }, mocks.NewDNSClient(mockCtl), mocks.NewOutboundManager(mockCtl), nil))
  233. lis := bufconn.Listen(1024 * 1024)
  234. bufDialer := func(context.Context, string) (net.Conn, error) {
  235. return lis.Dial()
  236. }
  237. errCh := make(chan error)
  238. // Server goroutine
  239. go func() {
  240. server := grpc.NewServer()
  241. RegisterRoutingServiceServer(server, NewRoutingServer(r, c))
  242. errCh <- server.Serve(lis)
  243. }()
  244. // Client goroutine
  245. go func() {
  246. defer lis.Close()
  247. conn, err := grpc.DialContext(
  248. context.Background(),
  249. "bufnet", grpc.WithContextDialer(bufDialer),
  250. grpc.WithTransportCredentials(insecure.NewCredentials()),
  251. )
  252. if err != nil {
  253. errCh <- err
  254. }
  255. defer conn.Close()
  256. client := NewRoutingServiceClient(conn)
  257. testCases := []*RoutingContext{
  258. {InboundTag: "in", OutboundTag: "out"},
  259. {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
  260. {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
  261. {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
  262. {Network: net.Network_UDP, Protocol: "bittorrent", OutboundTag: "blocked"},
  263. {User: "example@v2fly.org", OutboundTag: "out"},
  264. {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
  265. }
  266. // Test simple TestRoute
  267. testSimple := func() error {
  268. for _, tc := range testCases {
  269. route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc})
  270. if err != nil {
  271. return err
  272. }
  273. if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  274. t.Error(r)
  275. }
  276. }
  277. return nil
  278. }
  279. // Test TestRoute with special options
  280. testOptions := func() error {
  281. sub, err := c.Subscribe()
  282. if err != nil {
  283. return err
  284. }
  285. for _, tc := range testCases {
  286. route, err := client.TestRoute(context.Background(), &TestRouteRequest{
  287. RoutingContext: tc,
  288. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  289. PublishResult: true,
  290. })
  291. if err != nil {
  292. return err
  293. }
  294. stat := &RoutingContext{ // Only a subset of stats is retrieved
  295. SourceIPs: tc.SourceIPs,
  296. TargetIPs: tc.TargetIPs,
  297. SourcePort: tc.SourcePort,
  298. TargetPort: tc.TargetPort,
  299. TargetDomain: tc.TargetDomain,
  300. OutboundGroupTags: tc.OutboundGroupTags,
  301. OutboundTag: tc.OutboundTag,
  302. }
  303. if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  304. t.Error(r)
  305. }
  306. select { // Check that routing result has been published to statistics channel
  307. case msg, received := <-sub:
  308. if route, ok := msg.(routing.Route); received && ok {
  309. if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  310. t.Error(r)
  311. }
  312. } else {
  313. t.Error("unexpected failure in receiving published routing result for testcase", tc)
  314. }
  315. case <-time.After(100 * time.Millisecond):
  316. t.Error("unexpected failure in receiving published routing result", tc)
  317. }
  318. }
  319. return nil
  320. }
  321. if err := testSimple(); err != nil {
  322. errCh <- err
  323. }
  324. if err := testOptions(); err != nil {
  325. errCh <- err
  326. }
  327. errCh <- nil // Client passed all tests successfully
  328. }()
  329. // Wait for goroutines to complete
  330. select {
  331. case <-time.After(2 * time.Second):
  332. t.Fatal("Test timeout after 2s")
  333. case err := <-errCh:
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. }
  338. }