stats_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package stats_test
  2. import (
  3. "context"
  4. "fmt"
  5. "testing"
  6. "time"
  7. . "v2ray.com/core/app/stats"
  8. "v2ray.com/core/common"
  9. "v2ray.com/core/features/stats"
  10. )
  11. func TestInterface(t *testing.T) {
  12. _ = (stats.Manager)(new(Manager))
  13. }
  14. func TestStatsCounter(t *testing.T) {
  15. raw, err := common.CreateObject(context.Background(), &Config{})
  16. common.Must(err)
  17. m := raw.(stats.Manager)
  18. c, err := m.RegisterCounter("test.counter")
  19. common.Must(err)
  20. if v := c.Add(1); v != 1 {
  21. t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1)
  22. }
  23. if v := c.Set(0); v != 1 {
  24. t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1)
  25. }
  26. if v := c.Value(); v != 0 {
  27. t.Fatal("unexpected Value() return: ", v, ", wanted ", 0)
  28. }
  29. }
  30. func TestStatsChannel(t *testing.T) {
  31. raw, err := common.CreateObject(context.Background(), &Config{})
  32. common.Must(err)
  33. m := raw.(stats.Manager)
  34. c, err := m.RegisterChannel("test.channel")
  35. common.Must(err)
  36. source := c.Channel()
  37. a := c.Subscribe()
  38. b := c.Subscribe()
  39. defer c.Unsubscribe(a)
  40. defer c.Unsubscribe(b)
  41. stopCh := make(chan struct{})
  42. errCh := make(chan string)
  43. go func() {
  44. source <- 1
  45. source <- 2
  46. source <- "3"
  47. source <- []int{4}
  48. source <- nil // Dummy messsage with no subscriber receiving
  49. select {
  50. case source <- nil: // Source should be blocked here, for last message was not cleared
  51. errCh <- fmt.Sprint("unexpected non-blocked source")
  52. default:
  53. close(stopCh)
  54. }
  55. }()
  56. go func() {
  57. if v, ok := (<-a).(int); !ok || v != 1 {
  58. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  59. }
  60. if v, ok := (<-a).(int); !ok || v != 2 {
  61. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
  62. }
  63. if v, ok := (<-a).(string); !ok || v != "3" {
  64. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
  65. }
  66. if v, ok := (<-a).([]int); !ok || v[0] != 4 {
  67. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
  68. }
  69. }()
  70. go func() {
  71. if v, ok := (<-b).(int); !ok || v != 1 {
  72. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  73. }
  74. if v, ok := (<-b).(int); !ok || v != 2 {
  75. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
  76. }
  77. if v, ok := (<-b).(string); !ok || v != "3" {
  78. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
  79. }
  80. if v, ok := (<-b).([]int); !ok || v[0] != 4 {
  81. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
  82. }
  83. }()
  84. select {
  85. case <-time.After(2 * time.Second):
  86. t.Fatal("Test timeout after 2s")
  87. case e := <-errCh:
  88. t.Fatal(e)
  89. case <-stopCh:
  90. }
  91. }
  92. func TestStatsChannelUnsubcribe(t *testing.T) {
  93. raw, err := common.CreateObject(context.Background(), &Config{})
  94. common.Must(err)
  95. m := raw.(stats.Manager)
  96. c, err := m.RegisterChannel("test.channel")
  97. common.Must(err)
  98. source := c.Channel()
  99. a := c.Subscribe()
  100. b := c.Subscribe()
  101. defer c.Unsubscribe(a)
  102. pauseCh := make(chan struct{})
  103. stopCh := make(chan struct{})
  104. errCh := make(chan string)
  105. {
  106. var aSet, bSet bool
  107. for _, s := range c.Subscribers() {
  108. if s == a {
  109. aSet = true
  110. }
  111. if s == b {
  112. bSet = true
  113. }
  114. }
  115. if !(aSet && bSet) {
  116. t.Fatal("unexpected subscribers: ", c.Subscribers())
  117. }
  118. }
  119. go func() {
  120. source <- 1
  121. <-pauseCh // Wait for `b` goroutine to resume sending message
  122. source <- 2
  123. }()
  124. go func() {
  125. if v, ok := (<-a).(int); !ok || v != 1 {
  126. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  127. }
  128. if v, ok := (<-a).(int); !ok || v != 2 {
  129. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
  130. }
  131. }()
  132. go func() {
  133. if v, ok := (<-b).(int); !ok || v != 1 {
  134. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  135. }
  136. // Unsubscribe `b` while `source`'s messaging is paused
  137. c.Unsubscribe(b)
  138. { // Test `b` is not in subscribers
  139. var aSet, bSet bool
  140. for _, s := range c.Subscribers() {
  141. if s == a {
  142. aSet = true
  143. }
  144. if s == b {
  145. bSet = true
  146. }
  147. }
  148. if !(aSet && !bSet) {
  149. errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
  150. }
  151. }
  152. // Resume `source`'s progress
  153. close(pauseCh)
  154. // Test `b` is neither closed nor able to receive any data
  155. select {
  156. case v, ok := <-b:
  157. if ok {
  158. errCh <- fmt.Sprint("unexpected data received: ", v)
  159. } else {
  160. errCh <- fmt.Sprint("unexpected closed channel: ", b)
  161. }
  162. default:
  163. }
  164. close(stopCh)
  165. }()
  166. select {
  167. case <-time.After(2 * time.Second):
  168. t.Fatal("Test timeout after 2s")
  169. case e := <-errCh:
  170. t.Fatal(e)
  171. case <-stopCh:
  172. }
  173. }
  174. func TestStatsChannelTimeout(t *testing.T) {
  175. raw, err := common.CreateObject(context.Background(), &Config{})
  176. common.Must(err)
  177. m := raw.(stats.Manager)
  178. c, err := m.RegisterChannel("test.channel")
  179. common.Must(err)
  180. source := c.Channel()
  181. a := c.Subscribe()
  182. b := c.Subscribe()
  183. defer c.Unsubscribe(a)
  184. defer c.Unsubscribe(b)
  185. stopCh := make(chan struct{})
  186. errCh := make(chan string)
  187. go func() {
  188. source <- 1
  189. source <- 2
  190. }()
  191. go func() {
  192. if v, ok := (<-a).(int); !ok || v != 1 {
  193. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  194. }
  195. if v, ok := (<-a).(int); !ok || v != 2 {
  196. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
  197. }
  198. { // Test `b` is still in subscribers yet (because `a` receives 2 first)
  199. var aSet, bSet bool
  200. for _, s := range c.Subscribers() {
  201. if s == a {
  202. aSet = true
  203. }
  204. if s == b {
  205. bSet = true
  206. }
  207. }
  208. if !(aSet && bSet) {
  209. errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
  210. }
  211. }
  212. }()
  213. go func() {
  214. if v, ok := (<-b).(int); !ok || v != 1 {
  215. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  216. }
  217. // Block `b` channel for a time longer than `source`'s timeout
  218. <-time.After(150 * time.Millisecond)
  219. { // Test `b` has been unsubscribed by source
  220. var aSet, bSet bool
  221. for _, s := range c.Subscribers() {
  222. if s == a {
  223. aSet = true
  224. }
  225. if s == b {
  226. bSet = true
  227. }
  228. }
  229. if !(aSet && !bSet) {
  230. errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
  231. }
  232. }
  233. select { // Test `b` has been closed by source
  234. case v, ok := <-b:
  235. if ok {
  236. errCh <- fmt.Sprint("unexpected data received: ", v)
  237. }
  238. default:
  239. }
  240. close(stopCh)
  241. }()
  242. select {
  243. case <-time.After(2 * time.Second):
  244. t.Fatal("Test timeout after 2s")
  245. case e := <-errCh:
  246. t.Fatal(e)
  247. case <-stopCh:
  248. }
  249. }
  250. func TestStatsChannelConcurrency(t *testing.T) {
  251. raw, err := common.CreateObject(context.Background(), &Config{})
  252. common.Must(err)
  253. m := raw.(stats.Manager)
  254. c, err := m.RegisterChannel("test.channel")
  255. common.Must(err)
  256. source := c.Channel()
  257. a := c.Subscribe()
  258. b := c.Subscribe()
  259. defer c.Unsubscribe(a)
  260. stopCh := make(chan struct{})
  261. errCh := make(chan string)
  262. go func() {
  263. source <- 1
  264. source <- 2
  265. }()
  266. go func() {
  267. if v, ok := (<-a).(int); !ok || v != 1 {
  268. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
  269. }
  270. if v, ok := (<-a).(int); !ok || v != 2 {
  271. errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
  272. }
  273. }()
  274. go func() {
  275. // Block `b` for a time shorter than `source`'s timeout
  276. // So as to ensure source channel is trying to send message to `b`.
  277. <-time.After(25 * time.Millisecond)
  278. // This causes concurrency scenario: unsubscribe `b` while trying to send message to it
  279. c.Unsubscribe(b)
  280. // Test `b` is not closed and can still receive data 1:
  281. // Because unsubscribe won't affect the ongoing process of sending message.
  282. select {
  283. case v, ok := <-b:
  284. if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
  285. errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
  286. }
  287. default:
  288. errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
  289. }
  290. // Test `b` is not closed but cannot receive data 2:
  291. // Becuase in a new round of messaging, `b` has been unsubscribed.
  292. select {
  293. case v, ok := <-b:
  294. if ok {
  295. errCh <- fmt.Sprint("unexpected receving: ", v)
  296. } else {
  297. errCh <- fmt.Sprint("unexpected closing of channel")
  298. }
  299. default:
  300. }
  301. close(stopCh)
  302. }()
  303. select {
  304. case <-time.After(2 * time.Second):
  305. t.Fatal("Test timeout after 2s")
  306. case e := <-errCh:
  307. t.Fatal(e)
  308. case <-stopCh:
  309. }
  310. }