channel.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // +build !confonly
  2. package stats
  3. import (
  4. "sync"
  5. "time"
  6. )
  7. // Channel is an implementation of stats.Channel.
  8. type Channel struct {
  9. access sync.RWMutex
  10. closed chan struct{}
  11. channel chan interface{}
  12. subscribers []chan interface{}
  13. }
  14. // Channel returns the underlying go channel.
  15. func (c *Channel) Channel() chan interface{} {
  16. c.access.RLock()
  17. defer c.access.RUnlock()
  18. return c.channel
  19. }
  20. // Subscribers implements stats.Channel.
  21. func (c *Channel) Subscribers() []chan interface{} {
  22. c.access.RLock()
  23. defer c.access.RUnlock()
  24. return c.subscribers
  25. }
  26. // Subscribe implements stats.Channel.
  27. func (c *Channel) Subscribe() chan interface{} {
  28. c.access.Lock()
  29. defer c.access.Unlock()
  30. subscriber := make(chan interface{})
  31. c.subscribers = append(c.subscribers, subscriber)
  32. return subscriber
  33. }
  34. // Unsubscribe implements stats.Channel.
  35. func (c *Channel) Unsubscribe(subscriber chan interface{}) {
  36. c.access.Lock()
  37. defer c.access.Unlock()
  38. for i, s := range c.subscribers {
  39. if s == subscriber {
  40. // Copy to new memory block to prevent modifying original data
  41. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  42. copy(subscribers[:i], c.subscribers[:i])
  43. copy(subscribers[i:], c.subscribers[i+1:])
  44. c.subscribers = subscribers
  45. return
  46. }
  47. }
  48. }
  49. // Publish implements stats.Channel.
  50. func (c *Channel) Publish(message interface{}) {
  51. select { // Early exit if channel closed
  52. case <-c.closed:
  53. return
  54. default:
  55. }
  56. select { // Drop message if not successfully sent
  57. case c.channel <- message:
  58. default:
  59. return
  60. }
  61. }
  62. // Running returns whether the channel is running.
  63. func (c *Channel) Running() bool {
  64. select {
  65. case <-c.closed: // Channel closed
  66. default: // Channel running or not initialized
  67. if c.closed != nil { // Channel initialized
  68. return true
  69. }
  70. }
  71. return false
  72. }
  73. // Start implements common.Runnable.
  74. func (c *Channel) Start() error {
  75. c.access.Lock()
  76. defer c.access.Unlock()
  77. if c.Running() {
  78. return nil
  79. }
  80. if c.channel == nil { // Initialize publisher channel
  81. c.channel = make(chan interface{}, 16)
  82. }
  83. c.closed = make(chan struct{}) // Reset close signal
  84. go func() {
  85. for {
  86. select {
  87. case message := <-c.channel: // Broadcast message
  88. for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
  89. select {
  90. case sub <- message: // Successfully sent message
  91. case <-time.After(100 * time.Millisecond):
  92. c.Unsubscribe(sub) // Remove timeout subscriber
  93. close(sub) // Actively close subscriber as notification
  94. }
  95. }
  96. case <-c.closed: // Channel closed
  97. for _, sub := range c.Subscribers() { // Remove all subscribers
  98. c.Unsubscribe(sub)
  99. close(sub)
  100. }
  101. return
  102. }
  103. }
  104. }()
  105. return nil
  106. }
  107. // Close implements common.Closable.
  108. func (c *Channel) Close() error {
  109. c.access.Lock()
  110. defer c.access.Unlock()
  111. if c.Running() {
  112. close(c.closed) // Send closed signal
  113. }
  114. return nil
  115. }