stats.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // +build !confonly
  2. package stats
  3. //go:generate errorgen
  4. import (
  5. "context"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "v2ray.com/core/features/stats"
  10. )
  11. // Counter is an implementation of stats.Counter.
  12. type Counter struct {
  13. value int64
  14. }
  15. // Value implements stats.Counter.
  16. func (c *Counter) Value() int64 {
  17. return atomic.LoadInt64(&c.value)
  18. }
  19. // Set implements stats.Counter.
  20. func (c *Counter) Set(newValue int64) int64 {
  21. return atomic.SwapInt64(&c.value, newValue)
  22. }
  23. // Add implements stats.Counter.
  24. func (c *Counter) Add(delta int64) int64 {
  25. return atomic.AddInt64(&c.value, delta)
  26. }
  27. // Channel is an implementation of stats.Channel
  28. type Channel struct {
  29. channel chan interface{}
  30. subscribers []chan interface{}
  31. access sync.RWMutex
  32. }
  33. // Channel implements stats.Channel
  34. func (c *Channel) Channel() chan interface{} {
  35. return c.channel
  36. }
  37. // Subscribers implements stats.Channel
  38. func (c *Channel) Subscribers() []chan interface{} {
  39. c.access.RLock()
  40. defer c.access.RUnlock()
  41. return c.subscribers
  42. }
  43. // Subscribe implements stats.Channel
  44. func (c *Channel) Subscribe() chan interface{} {
  45. c.access.Lock()
  46. defer c.access.Unlock()
  47. ch := make(chan interface{})
  48. c.subscribers = append(c.subscribers, ch)
  49. return ch
  50. }
  51. // Unsubscribe implements stats.Channel
  52. func (c *Channel) Unsubscribe(ch chan interface{}) {
  53. c.access.Lock()
  54. defer c.access.Unlock()
  55. for i, s := range c.subscribers {
  56. if s == ch {
  57. // Copy to new memory block to prevent modifying original data
  58. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  59. copy(subscribers[:i], c.subscribers[:i])
  60. copy(subscribers[i:], c.subscribers[i+1:])
  61. c.subscribers = subscribers
  62. return
  63. }
  64. }
  65. }
  66. // Start starts the channel for listening to messsages
  67. func (c *Channel) Start() {
  68. for message := range c.Channel() {
  69. subscribers := c.Subscribers() // Store a copy of slice value for concurrency safety
  70. for _, sub := range subscribers {
  71. select {
  72. case sub <- message: // Successfully sent message
  73. case <-time.After(100 * time.Millisecond):
  74. c.Unsubscribe(sub) // Remove timeout subscriber
  75. close(sub) // Actively close subscriber as notification
  76. }
  77. }
  78. }
  79. }
  80. // Manager is an implementation of stats.Manager.
  81. type Manager struct {
  82. access sync.RWMutex
  83. counters map[string]*Counter
  84. channels map[string]*Channel
  85. }
  86. func NewManager(ctx context.Context, config *Config) (*Manager, error) {
  87. m := &Manager{
  88. counters: make(map[string]*Counter),
  89. channels: make(map[string]*Channel),
  90. }
  91. return m, nil
  92. }
  93. func (*Manager) Type() interface{} {
  94. return stats.ManagerType()
  95. }
  96. // RegisterCounter implements stats.Manager.
  97. func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
  98. m.access.Lock()
  99. defer m.access.Unlock()
  100. if _, found := m.counters[name]; found {
  101. return nil, newError("Counter ", name, " already registered.")
  102. }
  103. newError("create new counter ", name).AtDebug().WriteToLog()
  104. c := new(Counter)
  105. m.counters[name] = c
  106. return c, nil
  107. }
  108. // UnregisterCounter implements stats.Manager.
  109. func (m *Manager) UnregisterCounter(name string) error {
  110. m.access.Lock()
  111. defer m.access.Unlock()
  112. if _, found := m.counters[name]; !found {
  113. return newError("Counter ", name, " was not found.")
  114. }
  115. newError("remove counter ", name).AtDebug().WriteToLog()
  116. delete(m.counters, name)
  117. return nil
  118. }
  119. // GetCounter implements stats.Manager.
  120. func (m *Manager) GetCounter(name string) stats.Counter {
  121. m.access.RLock()
  122. defer m.access.RUnlock()
  123. if c, found := m.counters[name]; found {
  124. return c
  125. }
  126. return nil
  127. }
  128. // VisitCounters calls visitor function on all managed counters.
  129. func (m *Manager) VisitCounters(visitor func(string, stats.Counter) bool) {
  130. m.access.RLock()
  131. defer m.access.RUnlock()
  132. for name, c := range m.counters {
  133. if !visitor(name, c) {
  134. break
  135. }
  136. }
  137. }
  138. // RegisterChannel implements stats.Manager.
  139. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
  140. m.access.Lock()
  141. defer m.access.Unlock()
  142. if _, found := m.channels[name]; found {
  143. return nil, newError("Channel ", name, " already registered.")
  144. }
  145. newError("create new channel ", name).AtDebug().WriteToLog()
  146. c := &Channel{channel: make(chan interface{})}
  147. m.channels[name] = c
  148. go c.Start()
  149. return c, nil
  150. }
  151. // GetChannel implements stats.Manager.
  152. func (m *Manager) GetChannel(name string) stats.Channel {
  153. m.access.RLock()
  154. defer m.access.RUnlock()
  155. if c, found := m.channels[name]; found {
  156. return c
  157. }
  158. return nil
  159. }
  160. // Start implements common.Runnable.
  161. func (m *Manager) Start() error {
  162. return nil
  163. }
  164. // Close implement common.Closable.
  165. func (m *Manager) Close() error {
  166. return nil
  167. }