channel.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // +build !confonly
  2. package stats
  3. import (
  4. "context"
  5. "sync"
  6. "v2ray.com/core/common"
  7. )
  8. // Channel is an implementation of stats.Channel.
  9. type Channel struct {
  10. channel chan channelMessage
  11. subscribers []chan interface{}
  12. // Synchronization components
  13. access sync.RWMutex
  14. closed chan struct{}
  15. // Channel options
  16. blocking bool // Set blocking state if channel buffer reaches limit
  17. bufferSize int // Set to 0 as no buffering
  18. subsLimit int // Set to 0 as no subscriber limit
  19. }
  20. // NewChannel creates an instance of Statistics Channel.
  21. func NewChannel(config *ChannelConfig) *Channel {
  22. return &Channel{
  23. channel: make(chan channelMessage, config.BufferSize),
  24. subsLimit: int(config.SubscriberLimit),
  25. bufferSize: int(config.BufferSize),
  26. blocking: config.Blocking,
  27. }
  28. }
  29. // Subscribers implements stats.Channel.
  30. func (c *Channel) Subscribers() []chan interface{} {
  31. c.access.RLock()
  32. defer c.access.RUnlock()
  33. return c.subscribers
  34. }
  35. // Subscribe implements stats.Channel.
  36. func (c *Channel) Subscribe() (chan interface{}, error) {
  37. c.access.Lock()
  38. defer c.access.Unlock()
  39. if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
  40. return nil, newError("Number of subscribers has reached limit")
  41. }
  42. subscriber := make(chan interface{}, c.bufferSize)
  43. c.subscribers = append(c.subscribers, subscriber)
  44. return subscriber, nil
  45. }
  46. // Unsubscribe implements stats.Channel.
  47. func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
  48. c.access.Lock()
  49. defer c.access.Unlock()
  50. for i, s := range c.subscribers {
  51. if s == subscriber {
  52. // Copy to new memory block to prevent modifying original data
  53. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  54. copy(subscribers[:i], c.subscribers[:i])
  55. copy(subscribers[i:], c.subscribers[i+1:])
  56. c.subscribers = subscribers
  57. }
  58. }
  59. return nil
  60. }
  61. // Publish implements stats.Channel.
  62. func (c *Channel) Publish(ctx context.Context, msg interface{}) {
  63. select { // Early exit if channel closed
  64. case <-c.closed:
  65. return
  66. default:
  67. pub := channelMessage{context: ctx, message: msg}
  68. if c.blocking {
  69. pub.publish(c.channel)
  70. } else {
  71. pub.publishNonBlocking(c.channel)
  72. }
  73. }
  74. }
  75. // Running returns whether the channel is running.
  76. func (c *Channel) Running() bool {
  77. select {
  78. case <-c.closed: // Channel closed
  79. default: // Channel running or not initialized
  80. if c.closed != nil { // Channel initialized
  81. return true
  82. }
  83. }
  84. return false
  85. }
  86. // Start implements common.Runnable.
  87. func (c *Channel) Start() error {
  88. c.access.Lock()
  89. defer c.access.Unlock()
  90. if !c.Running() {
  91. c.closed = make(chan struct{}) // Reset close signal
  92. go func() {
  93. for {
  94. select {
  95. case pub := <-c.channel: // Published message received
  96. for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
  97. if c.blocking {
  98. pub.broadcast(sub)
  99. } else {
  100. pub.broadcastNonBlocking(sub)
  101. }
  102. }
  103. case <-c.closed: // Channel closed
  104. for _, sub := range c.Subscribers() { // Remove all subscribers
  105. common.Must(c.Unsubscribe(sub))
  106. close(sub)
  107. }
  108. return
  109. }
  110. }
  111. }()
  112. }
  113. return nil
  114. }
  115. // Close implements common.Closable.
  116. func (c *Channel) Close() error {
  117. c.access.Lock()
  118. defer c.access.Unlock()
  119. if c.Running() {
  120. close(c.closed) // Send closed signal
  121. }
  122. return nil
  123. }
  124. // channelMessage is the published message with guaranteed delivery.
  125. // message is discarded only when the context is early cancelled.
  126. type channelMessage struct {
  127. context context.Context
  128. message interface{}
  129. }
  130. func (c channelMessage) publish(publisher chan channelMessage) {
  131. select {
  132. case publisher <- c:
  133. case <-c.context.Done():
  134. }
  135. }
  136. func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
  137. select {
  138. case publisher <- c:
  139. default: // Create another goroutine to keep sending message
  140. go c.publish(publisher)
  141. }
  142. }
  143. func (c channelMessage) broadcast(subscriber chan interface{}) {
  144. select {
  145. case subscriber <- c.message:
  146. case <-c.context.Done():
  147. }
  148. }
  149. func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
  150. select {
  151. case subscriber <- c.message:
  152. default: // Create another goroutine to keep sending message
  153. go c.broadcast(subscriber)
  154. }
  155. }