stats.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. //go:build !confonly
  2. // +build !confonly
  3. package stats
  4. //go:generate go run github.com/v2fly/v2ray-core/v4/common/errors/errorgen
  5. import (
  6. "context"
  7. "sync"
  8. "github.com/v2fly/v2ray-core/v4/common"
  9. "github.com/v2fly/v2ray-core/v4/common/errors"
  10. "github.com/v2fly/v2ray-core/v4/features/stats"
  11. )
  12. // Manager is an implementation of stats.Manager.
  13. type Manager struct {
  14. access sync.RWMutex
  15. counters map[string]*Counter
  16. channels map[string]*Channel
  17. running bool
  18. }
  19. // NewManager creates an instance of Statistics Manager.
  20. func NewManager(ctx context.Context, config *Config) (*Manager, error) {
  21. m := &Manager{
  22. counters: make(map[string]*Counter),
  23. channels: make(map[string]*Channel),
  24. }
  25. return m, nil
  26. }
  27. // Type implements common.HasType.
  28. func (*Manager) Type() interface{} {
  29. return stats.ManagerType()
  30. }
  31. // RegisterCounter implements stats.Manager.
  32. func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
  33. m.access.Lock()
  34. defer m.access.Unlock()
  35. if _, found := m.counters[name]; found {
  36. return nil, newError("Counter ", name, " already registered.")
  37. }
  38. newError("create new counter ", name).AtDebug().WriteToLog()
  39. c := new(Counter)
  40. m.counters[name] = c
  41. return c, nil
  42. }
  43. // UnregisterCounter implements stats.Manager.
  44. func (m *Manager) UnregisterCounter(name string) error {
  45. m.access.Lock()
  46. defer m.access.Unlock()
  47. if _, found := m.counters[name]; found {
  48. newError("remove counter ", name).AtDebug().WriteToLog()
  49. delete(m.counters, name)
  50. }
  51. return nil
  52. }
  53. // GetCounter implements stats.Manager.
  54. func (m *Manager) GetCounter(name string) stats.Counter {
  55. m.access.RLock()
  56. defer m.access.RUnlock()
  57. if c, found := m.counters[name]; found {
  58. return c
  59. }
  60. return nil
  61. }
  62. // VisitCounters calls visitor function on all managed counters.
  63. func (m *Manager) VisitCounters(visitor func(string, stats.Counter) bool) {
  64. m.access.RLock()
  65. defer m.access.RUnlock()
  66. for name, c := range m.counters {
  67. if !visitor(name, c) {
  68. break
  69. }
  70. }
  71. }
  72. // RegisterChannel implements stats.Manager.
  73. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
  74. m.access.Lock()
  75. defer m.access.Unlock()
  76. if _, found := m.channels[name]; found {
  77. return nil, newError("Channel ", name, " already registered.")
  78. }
  79. newError("create new channel ", name).AtDebug().WriteToLog()
  80. c := NewChannel(&ChannelConfig{BufferSize: 64, Blocking: false})
  81. m.channels[name] = c
  82. if m.running {
  83. return c, c.Start()
  84. }
  85. return c, nil
  86. }
  87. // UnregisterChannel implements stats.Manager.
  88. func (m *Manager) UnregisterChannel(name string) error {
  89. m.access.Lock()
  90. defer m.access.Unlock()
  91. if c, found := m.channels[name]; found {
  92. newError("remove channel ", name).AtDebug().WriteToLog()
  93. delete(m.channels, name)
  94. return c.Close()
  95. }
  96. return nil
  97. }
  98. // GetChannel implements stats.Manager.
  99. func (m *Manager) GetChannel(name string) stats.Channel {
  100. m.access.RLock()
  101. defer m.access.RUnlock()
  102. if c, found := m.channels[name]; found {
  103. return c
  104. }
  105. return nil
  106. }
  107. // Start implements common.Runnable.
  108. func (m *Manager) Start() error {
  109. m.access.Lock()
  110. defer m.access.Unlock()
  111. m.running = true
  112. errs := []error{}
  113. for _, channel := range m.channels {
  114. if err := channel.Start(); err != nil {
  115. errs = append(errs, err)
  116. }
  117. }
  118. if len(errs) != 0 {
  119. return errors.Combine(errs...)
  120. }
  121. return nil
  122. }
  123. // Close implement common.Closable.
  124. func (m *Manager) Close() error {
  125. m.access.Lock()
  126. defer m.access.Unlock()
  127. m.running = false
  128. errs := []error{}
  129. for name, channel := range m.channels {
  130. newError("remove channel ", name).AtDebug().WriteToLog()
  131. delete(m.channels, name)
  132. if err := channel.Close(); err != nil {
  133. errs = append(errs, err)
  134. }
  135. }
  136. if len(errs) != 0 {
  137. return errors.Combine(errs...)
  138. }
  139. return nil
  140. }
  141. func init() {
  142. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  143. return NewManager(ctx, config.(*Config))
  144. }))
  145. }