channel.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. //go:build !confonly
  2. // +build !confonly
  3. package stats
  4. import (
  5. "context"
  6. "sync"
  7. "github.com/v2fly/v2ray-core/v4/common"
  8. )
  9. // Channel is an implementation of stats.Channel.
  10. type Channel struct {
  11. channel chan channelMessage
  12. subscribers []chan interface{}
  13. // Synchronization components
  14. access sync.RWMutex
  15. closed chan struct{}
  16. // Channel options
  17. blocking bool // Set blocking state if channel buffer reaches limit
  18. bufferSize int // Set to 0 as no buffering
  19. subsLimit int // Set to 0 as no subscriber limit
  20. }
  21. // NewChannel creates an instance of Statistics Channel.
  22. func NewChannel(config *ChannelConfig) *Channel {
  23. return &Channel{
  24. channel: make(chan channelMessage, config.BufferSize),
  25. subsLimit: int(config.SubscriberLimit),
  26. bufferSize: int(config.BufferSize),
  27. blocking: config.Blocking,
  28. }
  29. }
  30. // Subscribers implements stats.Channel.
  31. func (c *Channel) Subscribers() []chan interface{} {
  32. c.access.RLock()
  33. defer c.access.RUnlock()
  34. return c.subscribers
  35. }
  36. // Subscribe implements stats.Channel.
  37. func (c *Channel) Subscribe() (chan interface{}, error) {
  38. c.access.Lock()
  39. defer c.access.Unlock()
  40. if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
  41. return nil, newError("Number of subscribers has reached limit")
  42. }
  43. subscriber := make(chan interface{}, c.bufferSize)
  44. c.subscribers = append(c.subscribers, subscriber)
  45. return subscriber, nil
  46. }
  47. // Unsubscribe implements stats.Channel.
  48. func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
  49. c.access.Lock()
  50. defer c.access.Unlock()
  51. for i, s := range c.subscribers {
  52. if s == subscriber {
  53. // Copy to new memory block to prevent modifying original data
  54. subscribers := make([]chan interface{}, len(c.subscribers)-1)
  55. copy(subscribers[:i], c.subscribers[:i])
  56. copy(subscribers[i:], c.subscribers[i+1:])
  57. c.subscribers = subscribers
  58. }
  59. }
  60. return nil
  61. }
  62. // Publish implements stats.Channel.
  63. func (c *Channel) Publish(ctx context.Context, msg interface{}) {
  64. select { // Early exit if channel closed
  65. case <-c.closed:
  66. return
  67. default:
  68. pub := channelMessage{context: ctx, message: msg}
  69. if c.blocking {
  70. pub.publish(c.channel)
  71. } else {
  72. pub.publishNonBlocking(c.channel)
  73. }
  74. }
  75. }
  76. // Running returns whether the channel is running.
  77. func (c *Channel) Running() bool {
  78. select {
  79. case <-c.closed: // Channel closed
  80. default: // Channel running or not initialized
  81. if c.closed != nil { // Channel initialized
  82. return true
  83. }
  84. }
  85. return false
  86. }
  87. // Start implements common.Runnable.
  88. func (c *Channel) Start() error {
  89. c.access.Lock()
  90. defer c.access.Unlock()
  91. if !c.Running() {
  92. c.closed = make(chan struct{}) // Reset close signal
  93. go func() {
  94. for {
  95. select {
  96. case pub := <-c.channel: // Published message received
  97. for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
  98. if c.blocking {
  99. pub.broadcast(sub)
  100. } else {
  101. pub.broadcastNonBlocking(sub)
  102. }
  103. }
  104. case <-c.closed: // Channel closed
  105. for _, sub := range c.Subscribers() { // Remove all subscribers
  106. common.Must(c.Unsubscribe(sub))
  107. close(sub)
  108. }
  109. return
  110. }
  111. }
  112. }()
  113. }
  114. return nil
  115. }
  116. // Close implements common.Closable.
  117. func (c *Channel) Close() error {
  118. c.access.Lock()
  119. defer c.access.Unlock()
  120. if c.Running() {
  121. close(c.closed) // Send closed signal
  122. }
  123. return nil
  124. }
  125. // channelMessage is the published message with guaranteed delivery.
  126. // message is discarded only when the context is early cancelled.
  127. type channelMessage struct {
  128. context context.Context
  129. message interface{}
  130. }
  131. func (c channelMessage) publish(publisher chan channelMessage) {
  132. select {
  133. case publisher <- c:
  134. case <-c.context.Done():
  135. }
  136. }
  137. func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
  138. select {
  139. case publisher <- c:
  140. default: // Create another goroutine to keep sending message
  141. go c.publish(publisher)
  142. }
  143. }
  144. func (c channelMessage) broadcast(subscriber chan interface{}) {
  145. select {
  146. case subscriber <- c.message:
  147. case <-c.context.Done():
  148. }
  149. }
  150. func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
  151. select {
  152. case subscriber <- c.message:
  153. default: // Create another goroutine to keep sending message
  154. go c.broadcast(subscriber)
  155. }
  156. }