channel.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package stats
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/v2fly/v2ray-core/v4/common"
  6. )
  7. // Channel is an implementation of stats.Channel.
  8. type Channel struct {
  9. channel chan channelMessage
  10. subscribers []chan interface{}
  11. // Synchronization components
  12. access sync.RWMutex
  13. closed chan struct{}
  14. // Channel options
  15. blocking bool // Set blocking state if channel buffer reaches limit
  16. bufferSize int // Set to 0 as no buffering
  17. subsLimit int // Set to 0 as no subscriber limit
  18. }
  19. // NewChannel creates an instance of Statistics Channel.
  20. func NewChannel(config *ChannelConfig) *Channel {
  21. return &Channel{
  22. channel: make(chan channelMessage, config.BufferSize),
  23. subsLimit: int(config.SubscriberLimit),
  24. bufferSize: int(config.BufferSize),
  25. blocking: config.Blocking,
  26. }
  27. }
  28. // Subscribers implements stats.Channel.
  29. func (c *Channel) Subscribers() []chan interface{} {
  30. c.access.RLock()
  31. defer c.access.RUnlock()
  32. return c.subscribers
  33. }
  34. // Subscribe implements stats.Channel.
  35. func (c *Channel) Subscribe() (chan interface{}, error) {
  36. c.access.Lock()
  37. defer c.access.Unlock()
  38. if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
  39. return nil, newError("Number of subscribers has reached limit")
  40. }
  41. subscriber := make(chan interface{}, c.bufferSize)
  42. c.subscribers = append(c.subscribers, subscriber)
  43. return subscriber, nil
  44. }
  45. // Unsubscribe implements stats.Channel.
  46. func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
  47. c.access.Lock()
  48. defer c.access.Unlock()
  49. for i, s := range c.subscribers {
  50. if s == subscriber {
  51. // Copy to new memory block to prevent modifying original data
  52. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  53. copy(subscribers[:i], c.subscribers[:i])
  54. copy(subscribers[i:], c.subscribers[i+1:])
  55. c.subscribers = subscribers
  56. }
  57. }
  58. return nil
  59. }
  60. // Publish implements stats.Channel.
  61. func (c *Channel) Publish(ctx context.Context, msg interface{}) {
  62. select { // Early exit if channel closed
  63. case <-c.closed:
  64. return
  65. default:
  66. pub := channelMessage{context: ctx, message: msg}
  67. if c.blocking {
  68. pub.publish(c.channel)
  69. } else {
  70. pub.publishNonBlocking(c.channel)
  71. }
  72. }
  73. }
  74. // Running returns whether the channel is running.
  75. func (c *Channel) Running() bool {
  76. select {
  77. case <-c.closed: // Channel closed
  78. default: // Channel running or not initialized
  79. if c.closed != nil { // Channel initialized
  80. return true
  81. }
  82. }
  83. return false
  84. }
  85. // Start implements common.Runnable.
  86. func (c *Channel) Start() error {
  87. c.access.Lock()
  88. defer c.access.Unlock()
  89. if !c.Running() {
  90. c.closed = make(chan struct{}) // Reset close signal
  91. go func() {
  92. for {
  93. select {
  94. case pub := <-c.channel: // Published message received
  95. for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
  96. if c.blocking {
  97. pub.broadcast(sub)
  98. } else {
  99. pub.broadcastNonBlocking(sub)
  100. }
  101. }
  102. case <-c.closed: // Channel closed
  103. for _, sub := range c.Subscribers() { // Remove all subscribers
  104. common.Must(c.Unsubscribe(sub))
  105. close(sub)
  106. }
  107. return
  108. }
  109. }
  110. }()
  111. }
  112. return nil
  113. }
  114. // Close implements common.Closable.
  115. func (c *Channel) Close() error {
  116. c.access.Lock()
  117. defer c.access.Unlock()
  118. if c.Running() {
  119. close(c.closed) // Send closed signal
  120. }
  121. return nil
  122. }
  123. // channelMessage is the published message with guaranteed delivery.
  124. // message is discarded only when the context is early cancelled.
  125. type channelMessage struct {
  126. context context.Context
  127. message interface{}
  128. }
  129. func (c channelMessage) publish(publisher chan channelMessage) {
  130. select {
  131. case publisher <- c:
  132. case <-c.context.Done():
  133. }
  134. }
  135. func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
  136. select {
  137. case publisher <- c:
  138. default: // Create another goroutine to keep sending message
  139. go c.publish(publisher)
  140. }
  141. }
  142. func (c channelMessage) broadcast(subscriber chan interface{}) {
  143. select {
  144. case subscriber <- c.message:
  145. case <-c.context.Done():
  146. }
  147. }
  148. func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
  149. select {
  150. case subscriber <- c.message:
  151. default: // Create another goroutine to keep sending message
  152. go c.broadcast(subscriber)
  153. }
  154. }