command_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package command_test
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/golang/mock/gomock"
  7. "github.com/google/go-cmp/cmp"
  8. "github.com/google/go-cmp/cmp/cmpopts"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/test/bufconn"
  11. "v2ray.com/core/app/router"
  12. . "v2ray.com/core/app/router/command"
  13. "v2ray.com/core/app/stats"
  14. "v2ray.com/core/common"
  15. "v2ray.com/core/common/net"
  16. "v2ray.com/core/features/routing"
  17. "v2ray.com/core/testing/mocks"
  18. )
  19. func TestServiceSubscribeRoutingStats(t *testing.T) {
  20. c := stats.NewChannel(&stats.ChannelConfig{
  21. SubscriberLimit: 1,
  22. BufferSize: 16,
  23. BroadcastTimeout: 100,
  24. })
  25. common.Must(c.Start())
  26. defer c.Close()
  27. lis := bufconn.Listen(1024 * 1024)
  28. bufDialer := func(context.Context, string) (net.Conn, error) {
  29. return lis.Dial()
  30. }
  31. testCases := []*RoutingContext{
  32. {InboundTag: "in", OutboundTag: "out"},
  33. {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
  34. {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
  35. {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
  36. {Network: net.Network_UDP, OutboundGroupTags: []string{"outergroup", "innergroup"}, OutboundTag: "out"},
  37. {Protocol: "bittorrent", OutboundTag: "blocked"},
  38. {User: "example@v2fly.org", OutboundTag: "out"},
  39. {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
  40. }
  41. errCh := make(chan error)
  42. nextPub := make(chan struct{})
  43. // Server goroutine
  44. go func() {
  45. server := grpc.NewServer()
  46. RegisterRoutingServiceServer(server, NewRoutingServer(nil, c))
  47. errCh <- server.Serve(lis)
  48. }()
  49. // Publisher goroutine
  50. go func() {
  51. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  52. defer cancel()
  53. for { // Wait until there's one subscriber in routing stats channel
  54. if len(c.Subscribers()) > 0 {
  55. break
  56. }
  57. if ctx.Err() != nil {
  58. errCh <- ctx.Err()
  59. }
  60. }
  61. for _, tc := range testCases {
  62. c.Publish(AsRoutingRoute(tc))
  63. }
  64. // Wait for next round of publishing
  65. <-nextPub
  66. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  67. defer cancel()
  68. for { // Wait until there's one subscriber in routing stats channel
  69. if len(c.Subscribers()) > 0 {
  70. break
  71. }
  72. if ctx.Err() != nil {
  73. errCh <- ctx.Err()
  74. }
  75. }
  76. for _, tc := range testCases {
  77. c.Publish(AsRoutingRoute(tc))
  78. }
  79. }()
  80. // Client goroutine
  81. go func() {
  82. conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
  83. if err != nil {
  84. errCh <- err
  85. }
  86. defer lis.Close()
  87. defer conn.Close()
  88. client := NewRoutingServiceClient(conn)
  89. // Test retrieving all fields
  90. streamCtx, streamClose := context.WithCancel(context.Background())
  91. stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{})
  92. if err != nil {
  93. errCh <- err
  94. }
  95. for _, tc := range testCases {
  96. msg, err := stream.Recv()
  97. if err != nil {
  98. errCh <- err
  99. }
  100. if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  101. t.Error(r)
  102. }
  103. }
  104. // Test that double subscription will fail
  105. errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{
  106. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  107. })
  108. if err != nil {
  109. errCh <- err
  110. }
  111. if _, err := errStream.Recv(); err == nil {
  112. t.Error("unexpected successful subscription")
  113. }
  114. // Test the unsubscription of stream works well
  115. streamClose()
  116. timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second)
  117. defer timeout()
  118. for { // Wait until there's no subscriber in routing stats channel
  119. if len(c.Subscribers()) == 0 {
  120. break
  121. }
  122. if timeOutCtx.Err() != nil {
  123. t.Error("unexpected subscribers not decreased in channel")
  124. errCh <- timeOutCtx.Err()
  125. }
  126. }
  127. // Test retrieving only a subset of fields
  128. streamCtx, streamClose = context.WithCancel(context.Background())
  129. stream, err = client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{
  130. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  131. })
  132. if err != nil {
  133. errCh <- err
  134. }
  135. close(nextPub) // Send nextPub signal to start next round of publishing
  136. for _, tc := range testCases {
  137. msg, err := stream.Recv()
  138. stat := &RoutingContext{ // Only a subset of stats is retrieved
  139. SourceIPs: tc.SourceIPs,
  140. TargetIPs: tc.TargetIPs,
  141. SourcePort: tc.SourcePort,
  142. TargetPort: tc.TargetPort,
  143. TargetDomain: tc.TargetDomain,
  144. OutboundGroupTags: tc.OutboundGroupTags,
  145. OutboundTag: tc.OutboundTag,
  146. }
  147. if err != nil {
  148. errCh <- err
  149. }
  150. if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  151. t.Error(r)
  152. }
  153. }
  154. streamClose()
  155. // Client passed all tests successfully
  156. errCh <- nil
  157. }()
  158. // Wait for goroutines to complete
  159. select {
  160. case <-time.After(2 * time.Second):
  161. t.Fatal("Test timeout after 2s")
  162. case err := <-errCh:
  163. if err != nil {
  164. t.Fatal(err)
  165. }
  166. }
  167. }
  168. func TestSerivceTestRoute(t *testing.T) {
  169. c := stats.NewChannel(&stats.ChannelConfig{
  170. SubscriberLimit: 1,
  171. BufferSize: 16,
  172. BroadcastTimeout: 100,
  173. })
  174. common.Must(c.Start())
  175. defer c.Close()
  176. r := new(router.Router)
  177. mockCtl := gomock.NewController(t)
  178. defer mockCtl.Finish()
  179. common.Must(r.Init(&router.Config{
  180. Rule: []*router.RoutingRule{
  181. {
  182. InboundTag: []string{"in"},
  183. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  184. },
  185. {
  186. Protocol: []string{"bittorrent"},
  187. TargetTag: &router.RoutingRule_Tag{Tag: "blocked"},
  188. },
  189. {
  190. PortList: &net.PortList{Range: []*net.PortRange{{From: 8080, To: 8080}}},
  191. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  192. },
  193. {
  194. SourcePortList: &net.PortList{Range: []*net.PortRange{{From: 9999, To: 9999}}},
  195. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  196. },
  197. {
  198. Domain: []*router.Domain{{Type: router.Domain_Domain, Value: "com"}},
  199. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  200. },
  201. {
  202. SourceGeoip: []*router.GeoIP{{CountryCode: "private", Cidr: []*router.CIDR{{Ip: []byte{127, 0, 0, 0}, Prefix: 8}}}},
  203. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  204. },
  205. {
  206. UserEmail: []string{"example@v2fly.org"},
  207. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  208. },
  209. {
  210. Networks: []net.Network{net.Network_UDP, net.Network_TCP},
  211. TargetTag: &router.RoutingRule_Tag{Tag: "out"},
  212. },
  213. },
  214. }, mocks.NewDNSClient(mockCtl), mocks.NewOutboundManager(mockCtl)))
  215. lis := bufconn.Listen(1024 * 1024)
  216. bufDialer := func(context.Context, string) (net.Conn, error) {
  217. return lis.Dial()
  218. }
  219. errCh := make(chan error)
  220. // Server goroutine
  221. go func() {
  222. server := grpc.NewServer()
  223. RegisterRoutingServiceServer(server, NewRoutingServer(r, c))
  224. errCh <- server.Serve(lis)
  225. }()
  226. // Client goroutine
  227. go func() {
  228. conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
  229. if err != nil {
  230. errCh <- err
  231. }
  232. defer lis.Close()
  233. defer conn.Close()
  234. client := NewRoutingServiceClient(conn)
  235. testCases := []*RoutingContext{
  236. {InboundTag: "in", OutboundTag: "out"},
  237. {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
  238. {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
  239. {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
  240. {Network: net.Network_UDP, Protocol: "bittorrent", OutboundTag: "blocked"},
  241. {User: "example@v2fly.org", OutboundTag: "out"},
  242. {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
  243. }
  244. // Test simple TestRoute
  245. for _, tc := range testCases {
  246. route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc})
  247. if err != nil {
  248. errCh <- err
  249. }
  250. if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  251. t.Error(r)
  252. }
  253. }
  254. // Test TestRoute with special options
  255. sub, err := c.Subscribe()
  256. if err != nil {
  257. errCh <- err
  258. }
  259. for _, tc := range testCases {
  260. route, err := client.TestRoute(context.Background(), &TestRouteRequest{
  261. RoutingContext: tc,
  262. FieldSelectors: []string{"ip", "port", "domain", "outbound"},
  263. PublishResult: true,
  264. })
  265. stat := &RoutingContext{ // Only a subset of stats is retrieved
  266. SourceIPs: tc.SourceIPs,
  267. TargetIPs: tc.TargetIPs,
  268. SourcePort: tc.SourcePort,
  269. TargetPort: tc.TargetPort,
  270. TargetDomain: tc.TargetDomain,
  271. OutboundGroupTags: tc.OutboundGroupTags,
  272. OutboundTag: tc.OutboundTag,
  273. }
  274. if err != nil {
  275. errCh <- err
  276. }
  277. if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  278. t.Error(r)
  279. }
  280. select { // Check that routing result has been published to statistics channel
  281. case msg, received := <-sub:
  282. if route, ok := msg.(routing.Route); received && ok {
  283. if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
  284. t.Error(r)
  285. }
  286. } else {
  287. t.Error("unexpected failure in receiving published routing result")
  288. }
  289. case <-time.After(100 * time.Millisecond):
  290. t.Error("unexpected failure in receiving published routing result")
  291. }
  292. }
  293. // Client passed all tests successfully
  294. errCh <- nil
  295. }()
  296. // Wait for goroutines to complete
  297. select {
  298. case <-time.After(2 * time.Second):
  299. t.Fatal("Test timeout after 2s")
  300. case err := <-errCh:
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. }
  305. }