stats.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // +build !confonly
  2. package stats
  3. //go:generate errorgen
  4. import (
  5. "context"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "v2ray.com/core/features/stats"
  10. )
  11. // Counter is an implementation of stats.Counter.
  12. type Counter struct {
  13. value int64
  14. }
  15. // Value implements stats.Counter.
  16. func (c *Counter) Value() int64 {
  17. return atomic.LoadInt64(&c.value)
  18. }
  19. // Set implements stats.Counter.
  20. func (c *Counter) Set(newValue int64) int64 {
  21. return atomic.SwapInt64(&c.value, newValue)
  22. }
  23. // Add implements stats.Counter.
  24. func (c *Counter) Add(delta int64) int64 {
  25. return atomic.AddInt64(&c.value, delta)
  26. }
  27. // Channel is an implementation of stats.Channel
  28. type Channel struct {
  29. channel chan interface{}
  30. subscribers []chan interface{}
  31. access sync.RWMutex
  32. }
  33. // Channel implements stats.Channel
  34. func (c *Channel) Channel() chan interface{} {
  35. return c.channel
  36. }
  37. // Subscribers implements stats.Channel
  38. func (c *Channel) Subscribers() []chan interface{} {
  39. c.access.RLock()
  40. defer c.access.RUnlock()
  41. return c.subscribers
  42. }
  43. // Subscribe implements stats.Channel
  44. func (c *Channel) Subscribe() chan interface{} {
  45. c.access.Lock()
  46. defer c.access.Unlock()
  47. ch := make(chan interface{})
  48. c.subscribers = append(c.subscribers, ch)
  49. return ch
  50. }
  51. // Unsubscribe implements stats.Channel
  52. func (c *Channel) Unsubscribe(ch chan interface{}) {
  53. c.access.Lock()
  54. defer c.access.Unlock()
  55. for i, s := range c.subscribers {
  56. if s == ch {
  57. // Copy to new memory block to prevent modifying original data
  58. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  59. copy(subscribers[:i], c.subscribers[:i])
  60. copy(subscribers[i:], c.subscribers[i+1:])
  61. c.subscribers = subscribers
  62. return
  63. }
  64. }
  65. }
  66. // Start starts the channel for listening to messsages
  67. func (c *Channel) Start() {
  68. for message := range c.Channel() {
  69. subscribers := c.Subscribers() // Store a copy of slice value for concurrency safety
  70. for _, sub := range subscribers {
  71. select {
  72. case sub <- message: // Successfully sent message
  73. case <-time.After(100 * time.Millisecond):
  74. c.Unsubscribe(sub) // Remove timeout subscriber
  75. close(sub) // Actively close subscriber as notification
  76. }
  77. }
  78. }
  79. }
  80. // Manager is an implementation of stats.Manager.
  81. type Manager struct {
  82. access sync.RWMutex
  83. counters map[string]*Counter
  84. channels map[string]*Channel
  85. }
  86. func NewManager(ctx context.Context, config *Config) (*Manager, error) {
  87. m := &Manager{
  88. counters: make(map[string]*Counter),
  89. channels: make(map[string]*Channel),
  90. }
  91. return m, nil
  92. }
  93. func (*Manager) Type() interface{} {
  94. return stats.ManagerType()
  95. }
  96. // RegisterCounter implements stats.Manager.
  97. func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
  98. m.access.Lock()
  99. defer m.access.Unlock()
  100. if _, found := m.counters[name]; found {
  101. return nil, newError("Counter ", name, " already registered.")
  102. }
  103. newError("create new counter ", name).AtDebug().WriteToLog()
  104. c := new(Counter)
  105. m.counters[name] = c
  106. return c, nil
  107. }
  108. // GetCounter implements stats.Manager.
  109. func (m *Manager) GetCounter(name string) stats.Counter {
  110. m.access.RLock()
  111. defer m.access.RUnlock()
  112. if c, found := m.counters[name]; found {
  113. return c
  114. }
  115. return nil
  116. }
  117. // VisitCounters calls visitor function on all managed counters.
  118. func (m *Manager) VisitCounters(visitor func(string, stats.Counter) bool) {
  119. m.access.RLock()
  120. defer m.access.RUnlock()
  121. for name, c := range m.counters {
  122. if !visitor(name, c) {
  123. break
  124. }
  125. }
  126. }
  127. // RegisterChannel implements stats.Manager.
  128. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
  129. m.access.Lock()
  130. defer m.access.Unlock()
  131. if _, found := m.channels[name]; found {
  132. return nil, newError("Channel ", name, " already registered.")
  133. }
  134. newError("create new channel ", name).AtDebug().WriteToLog()
  135. c := &Channel{channel: make(chan interface{})}
  136. m.channels[name] = c
  137. go c.Start()
  138. return c, nil
  139. }
  140. // GetChannel implements stats.Manager.
  141. func (m *Manager) GetChannel(name string) stats.Channel {
  142. m.access.RLock()
  143. defer m.access.RUnlock()
  144. if c, found := m.channels[name]; found {
  145. return c
  146. }
  147. return nil
  148. }
  149. // Start implements common.Runnable.
  150. func (m *Manager) Start() error {
  151. return nil
  152. }
  153. // Close implement common.Closable.
  154. func (m *Manager) Close() error {
  155. return nil
  156. }