channel.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // +build !confonly
  2. package stats
  3. import (
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common"
  7. )
  8. // Channel is an implementation of stats.Channel.
  9. type Channel struct {
  10. channel chan interface{}
  11. subscribers []chan interface{}
  12. // Synchronization components
  13. access sync.RWMutex
  14. closed chan struct{}
  15. // Channel options
  16. subscriberLimit int // Set to 0 as no subscriber limit
  17. channelBufferSize int // Set to 0 as no buffering
  18. broadcastTimeout time.Duration // Set to 0 as non-blocking immediate timeout
  19. }
  20. // NewChannel creates an instance of Statistics Channel.
  21. func NewChannel(config *ChannelConfig) *Channel {
  22. return &Channel{
  23. channel: make(chan interface{}, config.BufferSize),
  24. subscriberLimit: int(config.SubscriberLimit),
  25. channelBufferSize: int(config.BufferSize),
  26. broadcastTimeout: time.Duration(config.BroadcastTimeout+1) * time.Millisecond,
  27. }
  28. }
  29. // Channel returns the underlying go channel.
  30. func (c *Channel) Channel() chan interface{} {
  31. c.access.RLock()
  32. defer c.access.RUnlock()
  33. return c.channel
  34. }
  35. // Subscribers implements stats.Channel.
  36. func (c *Channel) Subscribers() []chan interface{} {
  37. c.access.RLock()
  38. defer c.access.RUnlock()
  39. return c.subscribers
  40. }
  41. // Subscribe implements stats.Channel.
  42. func (c *Channel) Subscribe() (chan interface{}, error) {
  43. c.access.Lock()
  44. defer c.access.Unlock()
  45. if c.subscriberLimit > 0 && len(c.subscribers) >= c.subscriberLimit {
  46. return nil, newError("Number of subscribers has reached limit")
  47. }
  48. subscriber := make(chan interface{}, c.channelBufferSize)
  49. c.subscribers = append(c.subscribers, subscriber)
  50. return subscriber, nil
  51. }
  52. // Unsubscribe implements stats.Channel.
  53. func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
  54. c.access.Lock()
  55. defer c.access.Unlock()
  56. for i, s := range c.subscribers {
  57. if s == subscriber {
  58. // Copy to new memory block to prevent modifying original data
  59. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  60. copy(subscribers[:i], c.subscribers[:i])
  61. copy(subscribers[i:], c.subscribers[i+1:])
  62. c.subscribers = subscribers
  63. }
  64. }
  65. return nil
  66. }
  67. // Publish implements stats.Channel.
  68. func (c *Channel) Publish(message interface{}) {
  69. select { // Early exit if channel closed
  70. case <-c.closed:
  71. return
  72. default:
  73. }
  74. select { // Drop message if not successfully sent
  75. case c.channel <- message:
  76. default:
  77. return
  78. }
  79. }
  80. // Running returns whether the channel is running.
  81. func (c *Channel) Running() bool {
  82. select {
  83. case <-c.closed: // Channel closed
  84. default: // Channel running or not initialized
  85. if c.closed != nil { // Channel initialized
  86. return true
  87. }
  88. }
  89. return false
  90. }
  91. // Start implements common.Runnable.
  92. func (c *Channel) Start() error {
  93. c.access.Lock()
  94. defer c.access.Unlock()
  95. if !c.Running() {
  96. c.closed = make(chan struct{}) // Reset close signal
  97. go func() {
  98. for {
  99. select {
  100. case message := <-c.channel: // Broadcast message
  101. for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
  102. select {
  103. case sub <- message: // Successfully sent message
  104. case <-time.After(c.broadcastTimeout): // Remove timeout subscriber
  105. common.Must(c.Unsubscribe(sub))
  106. close(sub) // Actively close subscriber as notification
  107. }
  108. }
  109. case <-c.closed: // Channel closed
  110. for _, sub := range c.Subscribers() { // Remove all subscribers
  111. common.Must(c.Unsubscribe(sub))
  112. close(sub)
  113. }
  114. return
  115. }
  116. }
  117. }()
  118. }
  119. return nil
  120. }
  121. // Close implements common.Closable.
  122. func (c *Channel) Close() error {
  123. c.access.Lock()
  124. defer c.access.Unlock()
  125. if c.Running() {
  126. close(c.closed) // Send closed signal
  127. }
  128. return nil
  129. }