| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- // +build !confonly
- package stats
- import (
- "context"
- "sync"
- "v2ray.com/core/common"
- )
- // Channel is an implementation of stats.Channel.
- type Channel struct {
- channel chan channelMessage
- subscribers []chan interface{}
- // Synchronization components
- access sync.RWMutex
- closed chan struct{}
- // Channel options
- blocking bool // Set blocking state if channel buffer reaches limit
- bufferSize int // Set to 0 as no buffering
- subsLimit int // Set to 0 as no subscriber limit
- }
- // NewChannel creates an instance of Statistics Channel.
- func NewChannel(config *ChannelConfig) *Channel {
- return &Channel{
- channel: make(chan channelMessage, config.BufferSize),
- subsLimit: int(config.SubscriberLimit),
- bufferSize: int(config.BufferSize),
- blocking: config.Blocking,
- }
- }
- // Subscribers implements stats.Channel.
- func (c *Channel) Subscribers() []chan interface{} {
- c.access.RLock()
- defer c.access.RUnlock()
- return c.subscribers
- }
- // Subscribe implements stats.Channel.
- func (c *Channel) Subscribe() (chan interface{}, error) {
- c.access.Lock()
- defer c.access.Unlock()
- if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
- return nil, newError("Number of subscribers has reached limit")
- }
- subscriber := make(chan interface{}, c.bufferSize)
- c.subscribers = append(c.subscribers, subscriber)
- return subscriber, nil
- }
- // Unsubscribe implements stats.Channel.
- func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
- c.access.Lock()
- defer c.access.Unlock()
- for i, s := range c.subscribers {
- if s == subscriber {
- // Copy to new memory block to prevent modifying original data
- subscribers := make([]chan interface{}, len(c.subscribers)-1)
- copy(subscribers[:i], c.subscribers[:i])
- copy(subscribers[i:], c.subscribers[i+1:])
- c.subscribers = subscribers
- }
- }
- return nil
- }
- // Publish implements stats.Channel.
- func (c *Channel) Publish(ctx context.Context, msg interface{}) {
- select { // Early exit if channel closed
- case <-c.closed:
- return
- default:
- pub := channelMessage{context: ctx, message: msg}
- if c.blocking {
- pub.publish(c.channel)
- } else {
- pub.publishNonBlocking(c.channel)
- }
- }
- }
- // Running returns whether the channel is running.
- func (c *Channel) Running() bool {
- select {
- case <-c.closed: // Channel closed
- default: // Channel running or not initialized
- if c.closed != nil { // Channel initialized
- return true
- }
- }
- return false
- }
- // Start implements common.Runnable.
- func (c *Channel) Start() error {
- c.access.Lock()
- defer c.access.Unlock()
- if !c.Running() {
- c.closed = make(chan struct{}) // Reset close signal
- go func() {
- for {
- select {
- case pub := <-c.channel: // Published message received
- for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
- if c.blocking {
- pub.broadcast(sub)
- } else {
- pub.broadcastNonBlocking(sub)
- }
- }
- case <-c.closed: // Channel closed
- for _, sub := range c.Subscribers() { // Remove all subscribers
- common.Must(c.Unsubscribe(sub))
- close(sub)
- }
- return
- }
- }
- }()
- }
- return nil
- }
- // Close implements common.Closable.
- func (c *Channel) Close() error {
- c.access.Lock()
- defer c.access.Unlock()
- if c.Running() {
- close(c.closed) // Send closed signal
- }
- return nil
- }
- // channelMessage is the published message with guaranteed delivery.
- // message is discarded only when the context is early cancelled.
- type channelMessage struct {
- context context.Context
- message interface{}
- }
- func (c channelMessage) publish(publisher chan channelMessage) {
- select {
- case publisher <- c:
- case <-c.context.Done():
- }
- }
- func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
- select {
- case publisher <- c:
- default: // Create another goroutine to keep sending message
- go c.publish(publisher)
- }
- }
- func (c channelMessage) broadcast(subscriber chan interface{}) {
- select {
- case subscriber <- c.message:
- case <-c.context.Done():
- }
- }
- func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
- select {
- case subscriber <- c.message:
- default: // Create another goroutine to keep sending message
- go c.broadcast(subscriber)
- }
- }
|