stats.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // +build !confonly
  2. package stats
  3. //go:generate errorgen
  4. import (
  5. "context"
  6. "sync"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/errors"
  9. "v2ray.com/core/features/stats"
  10. )
  11. // Manager is an implementation of stats.Manager.
  12. type Manager struct {
  13. access sync.RWMutex
  14. counters map[string]*Counter
  15. channels map[string]*Channel
  16. running bool
  17. }
  18. // NewManager creates an instance of Statistics Manager.
  19. func NewManager(ctx context.Context, config *Config) (*Manager, error) {
  20. m := &Manager{
  21. counters: make(map[string]*Counter),
  22. channels: make(map[string]*Channel),
  23. }
  24. return m, nil
  25. }
  26. // Type implements common.HasType.
  27. func (*Manager) Type() interface{} {
  28. return stats.ManagerType()
  29. }
  30. // RegisterCounter implements stats.Manager.
  31. func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
  32. m.access.Lock()
  33. defer m.access.Unlock()
  34. if _, found := m.counters[name]; found {
  35. return nil, newError("Counter ", name, " already registered.")
  36. }
  37. newError("create new counter ", name).AtDebug().WriteToLog()
  38. c := new(Counter)
  39. m.counters[name] = c
  40. return c, nil
  41. }
  42. // UnregisterCounter implements stats.Manager.
  43. func (m *Manager) UnregisterCounter(name string) error {
  44. m.access.Lock()
  45. defer m.access.Unlock()
  46. if _, found := m.counters[name]; found {
  47. newError("remove counter ", name).AtDebug().WriteToLog()
  48. delete(m.counters, name)
  49. }
  50. return nil
  51. }
  52. // GetCounter implements stats.Manager.
  53. func (m *Manager) GetCounter(name string) stats.Counter {
  54. m.access.RLock()
  55. defer m.access.RUnlock()
  56. if c, found := m.counters[name]; found {
  57. return c
  58. }
  59. return nil
  60. }
  61. // VisitCounters calls visitor function on all managed counters.
  62. func (m *Manager) VisitCounters(visitor func(string, stats.Counter) bool) {
  63. m.access.RLock()
  64. defer m.access.RUnlock()
  65. for name, c := range m.counters {
  66. if !visitor(name, c) {
  67. break
  68. }
  69. }
  70. }
  71. // RegisterChannel implements stats.Manager.
  72. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
  73. m.access.Lock()
  74. defer m.access.Unlock()
  75. if _, found := m.channels[name]; found {
  76. return nil, newError("Channel ", name, " already registered.")
  77. }
  78. newError("create new channel ", name).AtDebug().WriteToLog()
  79. c := NewChannel(&ChannelConfig{BufferSize: 16, BroadcastTimeout: 100})
  80. m.channels[name] = c
  81. if m.running {
  82. return c, c.Start()
  83. }
  84. return c, nil
  85. }
  86. // UnregisterChannel implements stats.Manager.
  87. func (m *Manager) UnregisterChannel(name string) error {
  88. m.access.Lock()
  89. defer m.access.Unlock()
  90. if c, found := m.channels[name]; found {
  91. newError("remove channel ", name).AtDebug().WriteToLog()
  92. delete(m.channels, name)
  93. return c.Close()
  94. }
  95. return nil
  96. }
  97. // GetChannel implements stats.Manager.
  98. func (m *Manager) GetChannel(name string) stats.Channel {
  99. m.access.RLock()
  100. defer m.access.RUnlock()
  101. if c, found := m.channels[name]; found {
  102. return c
  103. }
  104. return nil
  105. }
  106. // Start implements common.Runnable.
  107. func (m *Manager) Start() error {
  108. m.access.Lock()
  109. defer m.access.Unlock()
  110. m.running = true
  111. errs := []error{}
  112. for _, channel := range m.channels {
  113. if err := channel.Start(); err != nil {
  114. errs = append(errs, err)
  115. }
  116. }
  117. if len(errs) != 0 {
  118. return errors.Combine(errs...)
  119. }
  120. return nil
  121. }
  122. // Close implement common.Closable.
  123. func (m *Manager) Close() error {
  124. m.access.Lock()
  125. defer m.access.Unlock()
  126. m.running = false
  127. errs := []error{}
  128. for name, channel := range m.channels {
  129. newError("remove channel ", name).AtDebug().WriteToLog()
  130. delete(m.channels, name)
  131. if err := channel.Close(); err != nil {
  132. errs = append(errs, err)
  133. }
  134. }
  135. if len(errs) != 0 {
  136. return errors.Combine(errs...)
  137. }
  138. return nil
  139. }
  140. func init() {
  141. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  142. return NewManager(ctx, config.(*Config))
  143. }))
  144. }