Browse Source

Implement common.Runnable for stats.Channel feature

Vigilans 5 years ago
parent
commit
4fca2fe940
7 changed files with 587 additions and 392 deletions
  1. 127 0
      app/stats/channel.go
  2. 334 0
      app/stats/channel_test.go
  3. 25 0
      app/stats/counter.go
  4. 31 0
      app/stats/counter_test.go
  5. 23 84
      app/stats/stats.go
  6. 38 304
      app/stats/stats_test.go
  7. 9 4
      features/stats/stats.go

+ 127 - 0
app/stats/channel.go

@@ -0,0 +1,127 @@
+// +build !confonly
+
+package stats
+
+import (
+	"sync"
+	"time"
+)
+
+// Channel is an implementation of stats.Channel.
+type Channel struct {
+	access sync.RWMutex
+	closed chan struct{}
+
+	channel     chan interface{}
+	subscribers []chan interface{}
+}
+
+// Channel returns the underlying go channel.
+func (c *Channel) Channel() chan interface{} {
+	c.access.RLock()
+	defer c.access.RUnlock()
+	return c.channel
+}
+
+// Subscribers implements stats.Channel.
+func (c *Channel) Subscribers() []chan interface{} {
+	c.access.RLock()
+	defer c.access.RUnlock()
+	return c.subscribers
+}
+
+// Subscribe implements stats.Channel.
+func (c *Channel) Subscribe() chan interface{} {
+	c.access.Lock()
+	defer c.access.Unlock()
+	subscriber := make(chan interface{})
+	c.subscribers = append(c.subscribers, subscriber)
+	return subscriber
+}
+
+// Unsubscribe implements stats.Channel.
+func (c *Channel) Unsubscribe(subscriber chan interface{}) {
+	c.access.Lock()
+	defer c.access.Unlock()
+	for i, s := range c.subscribers {
+		if s == subscriber {
+			// Copy to new memory block to prevent modifying original data
+			subscribers := make([]chan interface{}, len(c.subscribers)-1)
+			copy(subscribers[:i], c.subscribers[:i])
+			copy(subscribers[i:], c.subscribers[i+1:])
+			c.subscribers = subscribers
+			return
+		}
+	}
+}
+
+// Publish implements stats.Channel.
+func (c *Channel) Publish(message interface{}) {
+	select { // Early exit if channel closed
+	case <-c.closed:
+		return
+	default:
+	}
+	select { // Drop message if not successfully sent
+	case c.channel <- message:
+	default:
+		return
+	}
+}
+
+// Running returns whether the channel is running.
+func (c *Channel) Running() bool {
+	select {
+	case <-c.closed: // Channel closed
+	default: // Channel running or not initialized
+		if c.closed != nil { // Channel initialized
+			return true
+		}
+	}
+	return false
+}
+
+// Start implements common.Runnable.
+func (c *Channel) Start() error {
+	c.access.Lock()
+	defer c.access.Unlock()
+	if c.Running() {
+		return nil
+	}
+	if c.channel == nil { // Initialize publisher channel
+		c.channel = make(chan interface{}, 16)
+	}
+	c.closed = make(chan struct{}) // Reset close signal
+	go func() {
+		for {
+			select {
+			case message := <-c.channel: // Broadcast message
+				for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
+					select {
+					case sub <- message: // Successfully sent message
+					case <-time.After(100 * time.Millisecond):
+						c.Unsubscribe(sub) // Remove timeout subscriber
+						close(sub)         // Actively close subscriber as notification
+					}
+				}
+			case <-c.closed: // Channel closed
+				for _, sub := range c.Subscribers() { // Remove all subscribers
+					c.Unsubscribe(sub)
+					close(sub)
+				}
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+// Close implements common.Closable.
+func (c *Channel) Close() error {
+	c.access.Lock()
+	defer c.access.Unlock()
+	if c.Running() {
+		close(c.closed) // Send closed signal
+	}
+	return nil
+}

+ 334 - 0
app/stats/channel_test.go

@@ -0,0 +1,334 @@
+package stats_test
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+
+	. "v2ray.com/core/app/stats"
+	"v2ray.com/core/common"
+	"v2ray.com/core/features/stats"
+)
+
+func TestStatsChannel(t *testing.T) {
+	raw, err := common.CreateObject(context.Background(), &Config{})
+	common.Must(err)
+
+	m := raw.(stats.Manager)
+	c, err := m.RegisterChannel("test.channel")
+	common.Must(err)
+	common.Must(m.Start())
+	defer m.Close()
+
+	source := c.(*Channel).Channel()
+	a := c.Subscribe()
+	b := c.Subscribe()
+	defer c.Unsubscribe(a)
+	defer c.Unsubscribe(b)
+
+	stopCh := make(chan struct{})
+	errCh := make(chan string)
+
+	go func() {
+		source <- 1
+		source <- 2
+		source <- "3"
+		source <- []int{4}
+		source <- nil // Dummy messsage with no subscriber receiving, will block reading goroutine
+		for i := 0; i < cap(source); i++ {
+			source <- nil // Fill source channel's buffer
+		}
+		select {
+		case source <- nil: // Source writing should be blocked here, for last message was not cleared and buffer was full
+			errCh <- fmt.Sprint("unexpected non-blocked source channel")
+		default:
+			close(stopCh)
+		}
+	}()
+
+	go func() {
+		if v, ok := (<-a).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		if v, ok := (<-a).(int); !ok || v != 2 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
+		}
+		if v, ok := (<-a).(string); !ok || v != "3" {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
+		}
+		if v, ok := (<-a).([]int); !ok || v[0] != 4 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
+		}
+	}()
+
+	go func() {
+		if v, ok := (<-b).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		if v, ok := (<-b).(int); !ok || v != 2 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
+		}
+		if v, ok := (<-b).(string); !ok || v != "3" {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
+		}
+		if v, ok := (<-b).([]int); !ok || v[0] != 4 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
+		}
+	}()
+
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case e := <-errCh:
+		t.Fatal(e)
+	case <-stopCh:
+	}
+}
+
+func TestStatsChannelUnsubcribe(t *testing.T) {
+	raw, err := common.CreateObject(context.Background(), &Config{})
+	common.Must(err)
+
+	m := raw.(stats.Manager)
+	c, err := m.RegisterChannel("test.channel")
+	common.Must(err)
+	common.Must(m.Start())
+	defer m.Close()
+
+	a := c.Subscribe()
+	b := c.Subscribe()
+	defer c.Unsubscribe(a)
+
+	pauseCh := make(chan struct{})
+	stopCh := make(chan struct{})
+	errCh := make(chan string)
+
+	{
+		var aSet, bSet bool
+		for _, s := range c.Subscribers() {
+			if s == a {
+				aSet = true
+			}
+			if s == b {
+				bSet = true
+			}
+		}
+		if !(aSet && bSet) {
+			t.Fatal("unexpected subscribers: ", c.Subscribers())
+		}
+	}
+
+	go func() {
+		c.Publish(1)
+		<-pauseCh // Wait for `b` goroutine to resume sending message
+		c.Publish(2)
+	}()
+
+	go func() {
+		if v, ok := (<-a).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		if v, ok := (<-a).(int); !ok || v != 2 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
+		}
+	}()
+
+	go func() {
+		if v, ok := (<-b).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		// Unsubscribe `b` while `source`'s messaging is paused
+		c.Unsubscribe(b)
+		{ // Test `b` is not in subscribers
+			var aSet, bSet bool
+			for _, s := range c.Subscribers() {
+				if s == a {
+					aSet = true
+				}
+				if s == b {
+					bSet = true
+				}
+			}
+			if !(aSet && !bSet) {
+				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
+			}
+		}
+		// Resume `source`'s progress
+		close(pauseCh)
+		// Test `b` is neither closed nor able to receive any data
+		select {
+		case v, ok := <-b:
+			if ok {
+				errCh <- fmt.Sprint("unexpected data received: ", v)
+			} else {
+				errCh <- fmt.Sprint("unexpected closed channel: ", b)
+			}
+		default:
+		}
+		close(stopCh)
+	}()
+
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case e := <-errCh:
+		t.Fatal(e)
+	case <-stopCh:
+	}
+}
+
+func TestStatsChannelTimeout(t *testing.T) {
+	raw, err := common.CreateObject(context.Background(), &Config{})
+	common.Must(err)
+
+	m := raw.(stats.Manager)
+	c, err := m.RegisterChannel("test.channel")
+	common.Must(err)
+	common.Must(m.Start())
+	defer m.Close()
+
+	a := c.Subscribe()
+	b := c.Subscribe()
+	defer c.Unsubscribe(a)
+	defer c.Unsubscribe(b)
+
+	stopCh := make(chan struct{})
+	errCh := make(chan string)
+
+	go func() {
+		c.Publish(1)
+		c.Publish(2)
+	}()
+
+	go func() {
+		if v, ok := (<-a).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		if v, ok := (<-a).(int); !ok || v != 2 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
+		}
+		{ // Test `b` is still in subscribers yet (because `a` receives 2 first)
+			var aSet, bSet bool
+			for _, s := range c.Subscribers() {
+				if s == a {
+					aSet = true
+				}
+				if s == b {
+					bSet = true
+				}
+			}
+			if !(aSet && bSet) {
+				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
+			}
+		}
+	}()
+
+	go func() {
+		if v, ok := (<-b).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		// Block `b` channel for a time longer than `source`'s timeout
+		<-time.After(150 * time.Millisecond)
+		{ // Test `b` has been unsubscribed by source
+			var aSet, bSet bool
+			for _, s := range c.Subscribers() {
+				if s == a {
+					aSet = true
+				}
+				if s == b {
+					bSet = true
+				}
+			}
+			if !(aSet && !bSet) {
+				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
+			}
+		}
+		select { // Test `b` has been closed by source
+		case v, ok := <-b:
+			if ok {
+				errCh <- fmt.Sprint("unexpected data received: ", v)
+			}
+		default:
+		}
+		close(stopCh)
+	}()
+
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case e := <-errCh:
+		t.Fatal(e)
+	case <-stopCh:
+	}
+}
+
+func TestStatsChannelConcurrency(t *testing.T) {
+	raw, err := common.CreateObject(context.Background(), &Config{})
+	common.Must(err)
+
+	m := raw.(stats.Manager)
+	c, err := m.RegisterChannel("test.channel")
+	common.Must(err)
+	common.Must(m.Start())
+	defer m.Close()
+
+	a := c.Subscribe()
+	b := c.Subscribe()
+	defer c.Unsubscribe(a)
+
+	stopCh := make(chan struct{})
+	errCh := make(chan string)
+
+	go func() {
+		c.Publish(1)
+		c.Publish(2)
+	}()
+
+	go func() {
+		if v, ok := (<-a).(int); !ok || v != 1 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
+		}
+		if v, ok := (<-a).(int); !ok || v != 2 {
+			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
+		}
+	}()
+
+	go func() {
+		// Block `b` for a time shorter than `source`'s timeout
+		// So as to ensure source channel is trying to send message to `b`.
+		<-time.After(25 * time.Millisecond)
+		// This causes concurrency scenario: unsubscribe `b` while trying to send message to it
+		c.Unsubscribe(b)
+		// Test `b` is not closed and can still receive data 1:
+		// Because unsubscribe won't affect the ongoing process of sending message.
+		select {
+		case v, ok := <-b:
+			if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
+				errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
+			}
+		default:
+			errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
+		}
+		// Test `b` is not closed but cannot receive data 2:
+		// Becuase in a new round of messaging, `b` has been unsubscribed.
+		select {
+		case v, ok := <-b:
+			if ok {
+				errCh <- fmt.Sprint("unexpected receving: ", v)
+			} else {
+				errCh <- fmt.Sprint("unexpected closing of channel")
+			}
+		default:
+		}
+		close(stopCh)
+	}()
+
+	select {
+	case <-time.After(2 * time.Second):
+		t.Fatal("Test timeout after 2s")
+	case e := <-errCh:
+		t.Fatal(e)
+	case <-stopCh:
+	}
+}

+ 25 - 0
app/stats/counter.go

@@ -0,0 +1,25 @@
+// +build !confonly
+
+package stats
+
+import "sync/atomic"
+
+// Counter is an implementation of stats.Counter.
+type Counter struct {
+	value int64
+}
+
+// Value implements stats.Counter.
+func (c *Counter) Value() int64 {
+	return atomic.LoadInt64(&c.value)
+}
+
+// Set implements stats.Counter.
+func (c *Counter) Set(newValue int64) int64 {
+	return atomic.SwapInt64(&c.value, newValue)
+}
+
+// Add implements stats.Counter.
+func (c *Counter) Add(delta int64) int64 {
+	return atomic.AddInt64(&c.value, delta)
+}

+ 31 - 0
app/stats/counter_test.go

@@ -0,0 +1,31 @@
+package stats_test
+
+import (
+	"context"
+	"testing"
+
+	. "v2ray.com/core/app/stats"
+	"v2ray.com/core/common"
+	"v2ray.com/core/features/stats"
+)
+
+func TestStatsCounter(t *testing.T) {
+	raw, err := common.CreateObject(context.Background(), &Config{})
+	common.Must(err)
+
+	m := raw.(stats.Manager)
+	c, err := m.RegisterCounter("test.counter")
+	common.Must(err)
+
+	if v := c.Add(1); v != 1 {
+		t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1)
+	}
+
+	if v := c.Set(0); v != 1 {
+		t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1)
+	}
+
+	if v := c.Value(); v != 0 {
+		t.Fatal("unexpected Value() return: ", v, ", wanted ", 0)
+	}
+}

+ 23 - 84
app/stats/stats.go

@@ -7,98 +7,19 @@ package stats
 import (
 	"context"
 	"sync"
-	"sync/atomic"
-	"time"
 
 	"v2ray.com/core/features/stats"
 )
 
-// Counter is an implementation of stats.Counter.
-type Counter struct {
-	value int64
-}
-
-// Value implements stats.Counter.
-func (c *Counter) Value() int64 {
-	return atomic.LoadInt64(&c.value)
-}
-
-// Set implements stats.Counter.
-func (c *Counter) Set(newValue int64) int64 {
-	return atomic.SwapInt64(&c.value, newValue)
-}
-
-// Add implements stats.Counter.
-func (c *Counter) Add(delta int64) int64 {
-	return atomic.AddInt64(&c.value, delta)
-}
-
-// Channel is an implementation of stats.Channel
-type Channel struct {
-	channel     chan interface{}
-	subscribers []chan interface{}
-	access      sync.RWMutex
-}
-
-// Channel implements stats.Channel
-func (c *Channel) Channel() chan interface{} {
-	return c.channel
-}
-
-// Subscribers implements stats.Channel
-func (c *Channel) Subscribers() []chan interface{} {
-	c.access.RLock()
-	defer c.access.RUnlock()
-	return c.subscribers
-}
-
-// Subscribe implements stats.Channel
-func (c *Channel) Subscribe() chan interface{} {
-	c.access.Lock()
-	defer c.access.Unlock()
-	ch := make(chan interface{})
-	c.subscribers = append(c.subscribers, ch)
-	return ch
-}
-
-// Unsubscribe implements stats.Channel
-func (c *Channel) Unsubscribe(ch chan interface{}) {
-	c.access.Lock()
-	defer c.access.Unlock()
-	for i, s := range c.subscribers {
-		if s == ch {
-			// Copy to new memory block to prevent modifying original data
-			subscribers := make([]chan interface{}, len(c.subscribers)-1)
-			copy(subscribers[:i], c.subscribers[:i])
-			copy(subscribers[i:], c.subscribers[i+1:])
-			c.subscribers = subscribers
-			return
-		}
-	}
-}
-
-// Start starts the channel for listening to messsages
-func (c *Channel) Start() {
-	for message := range c.Channel() {
-		subscribers := c.Subscribers() // Store a copy of slice value for concurrency safety
-		for _, sub := range subscribers {
-			select {
-			case sub <- message: // Successfully sent message
-			case <-time.After(100 * time.Millisecond):
-				c.Unsubscribe(sub) // Remove timeout subscriber
-				close(sub)         // Actively close subscriber as notification
-			}
-		}
-	}
-}
-
 // Manager is an implementation of stats.Manager.
 type Manager struct {
 	access   sync.RWMutex
 	counters map[string]*Counter
 	channels map[string]*Channel
+	running  bool
 }
 
+// NewManager creates an instance of Statistics Manager.
 func NewManager(ctx context.Context, config *Config) (*Manager, error) {
 	m := &Manager{
 		counters: make(map[string]*Counter),
@@ -108,6 +29,7 @@ func NewManager(ctx context.Context, config *Config) (*Manager, error) {
 	return m, nil
 }
 
+// Type implements common.HasType.
 func (*Manager) Type() interface{} {
 	return stats.ManagerType()
 }
@@ -170,9 +92,11 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
 		return nil, newError("Channel ", name, " already registered.")
 	}
 	newError("create new channel ", name).AtDebug().WriteToLog()
-	c := &Channel{channel: make(chan interface{})}
+	c := new(Channel)
 	m.channels[name] = c
-	go c.Start()
+	if m.running {
+		c.Start()
+	}
 	return c, nil
 }
 
@@ -181,9 +105,10 @@ func (m *Manager) UnregisterChannel(name string) error {
 	m.access.Lock()
 	defer m.access.Unlock()
 
-	if _, found := m.channels[name]; found {
+	if c, found := m.channels[name]; found {
 		newError("remove channel ", name).AtDebug().WriteToLog()
 		delete(m.channels, name)
+		c.Close()
 	}
 	return nil
 }
@@ -201,10 +126,24 @@ func (m *Manager) GetChannel(name string) stats.Channel {
 
 // Start implements common.Runnable.
 func (m *Manager) Start() error {
+	m.access.Lock()
+	defer m.access.Unlock()
+	m.running = true
+	for _, channel := range m.channels {
+		channel.Start()
+	}
 	return nil
 }
 
 // Close implement common.Closable.
 func (m *Manager) Close() error {
+	m.access.Lock()
+	defer m.access.Unlock()
+	m.running = false
+	for name, channel := range m.channels {
+		newError("remove channel ", name).AtDebug().WriteToLog()
+		delete(m.channels, name)
+		channel.Close()
+	}
 	return nil
 }

+ 38 - 304
app/stats/stats_test.go

@@ -2,7 +2,6 @@ package stats_test
 
 import (
 	"context"
-	"fmt"
 	"testing"
 	"time"
 
@@ -15,337 +14,72 @@ func TestInterface(t *testing.T) {
 	_ = (stats.Manager)(new(Manager))
 }
 
-func TestStatsCounter(t *testing.T) {
+func TestStatsChannelRunnable(t *testing.T) {
 	raw, err := common.CreateObject(context.Background(), &Config{})
 	common.Must(err)
 
 	m := raw.(stats.Manager)
-	c, err := m.RegisterCounter("test.counter")
+
+	ch1, err := m.RegisterChannel("test.channel.1")
+	c1 := ch1.(*Channel)
 	common.Must(err)
 
-	if v := c.Add(1); v != 1 {
-		t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1)
+	if c1.Running() {
+		t.Fatalf("unexpected running channel: test.channel.%d", 1)
 	}
 
-	if v := c.Set(0); v != 1 {
-		t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1)
-	}
+	common.Must(m.Start())
 
-	if v := c.Value(); v != 0 {
-		t.Fatal("unexpected Value() return: ", v, ", wanted ", 0)
+	if !c1.Running() {
+		t.Fatalf("unexpected non-running channel: test.channel.%d", 1)
 	}
-}
 
-func TestStatsChannel(t *testing.T) {
-	raw, err := common.CreateObject(context.Background(), &Config{})
+	ch2, err := m.RegisterChannel("test.channel.2")
+	c2 := ch2.(*Channel)
 	common.Must(err)
 
-	m := raw.(stats.Manager)
-	c, err := m.RegisterChannel("test.channel")
-	common.Must(err)
-
-	source := c.Channel()
-	a := c.Subscribe()
-	b := c.Subscribe()
-	defer c.Unsubscribe(a)
-	defer c.Unsubscribe(b)
-
-	stopCh := make(chan struct{})
-	errCh := make(chan string)
-
-	go func() {
-		source <- 1
-		source <- 2
-		source <- "3"
-		source <- []int{4}
-		source <- nil // Dummy messsage with no subscriber receiving
-		select {
-		case source <- nil: // Source should be blocked here, for last message was not cleared
-			errCh <- fmt.Sprint("unexpected non-blocked source")
-		default:
-			close(stopCh)
-		}
-	}()
-
-	go func() {
-		if v, ok := (<-a).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		if v, ok := (<-a).(int); !ok || v != 2 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
-		}
-		if v, ok := (<-a).(string); !ok || v != "3" {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
-		}
-		if v, ok := (<-a).([]int); !ok || v[0] != 4 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
-		}
-	}()
-
-	go func() {
-		if v, ok := (<-b).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		if v, ok := (<-b).(int); !ok || v != 2 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
-		}
-		if v, ok := (<-b).(string); !ok || v != "3" {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
-		}
-		if v, ok := (<-b).([]int); !ok || v[0] != 4 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
-		}
-	}()
-
-	select {
-	case <-time.After(2 * time.Second):
-		t.Fatal("Test timeout after 2s")
-	case e := <-errCh:
-		t.Fatal(e)
-	case <-stopCh:
+	if !c2.Running() {
+		t.Fatalf("unexpected non-running channel: test.channel.%d", 2)
 	}
-}
 
-func TestStatsChannelUnsubcribe(t *testing.T) {
-	raw, err := common.CreateObject(context.Background(), &Config{})
-	common.Must(err)
+	s1 := c1.Subscribe()
+	common.Must(c1.Close())
 
-	m := raw.(stats.Manager)
-	c, err := m.RegisterChannel("test.channel")
-	common.Must(err)
-
-	source := c.Channel()
-	a := c.Subscribe()
-	b := c.Subscribe()
-	defer c.Unsubscribe(a)
-
-	pauseCh := make(chan struct{})
-	stopCh := make(chan struct{})
-	errCh := make(chan string)
-
-	{
-		var aSet, bSet bool
-		for _, s := range c.Subscribers() {
-			if s == a {
-				aSet = true
-			}
-			if s == b {
-				bSet = true
-			}
-		}
-		if !(aSet && bSet) {
-			t.Fatal("unexpected subscribers: ", c.Subscribers())
-		}
+	if c1.Running() {
+		t.Fatalf("unexpected running channel: test.channel.%d", 1)
 	}
 
-	go func() {
-		source <- 1
-		<-pauseCh // Wait for `b` goroutine to resume sending message
-		source <- 2
-	}()
-
-	go func() {
-		if v, ok := (<-a).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		if v, ok := (<-a).(int); !ok || v != 2 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
-		}
-	}()
-
-	go func() {
-		if v, ok := (<-b).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		// Unsubscribe `b` while `source`'s messaging is paused
-		c.Unsubscribe(b)
-		{ // Test `b` is not in subscribers
-			var aSet, bSet bool
-			for _, s := range c.Subscribers() {
-				if s == a {
-					aSet = true
-				}
-				if s == b {
-					bSet = true
-				}
-			}
-			if !(aSet && !bSet) {
-				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
-			}
-		}
-		// Resume `source`'s progress
-		close(pauseCh)
-		// Test `b` is neither closed nor able to receive any data
-		select {
-		case v, ok := <-b:
-			if ok {
-				errCh <- fmt.Sprint("unexpected data received: ", v)
-			} else {
-				errCh <- fmt.Sprint("unexpected closed channel: ", b)
-			}
-		default:
+	select { // Check all subscribers in closed channel are closed
+	case _, ok := <-s1:
+		if ok {
+			t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1)
 		}
-		close(stopCh)
-	}()
-
-	select {
-	case <-time.After(2 * time.Second):
-		t.Fatal("Test timeout after 2s")
-	case e := <-errCh:
-		t.Fatal(e)
-	case <-stopCh:
+	case <-time.After(500 * time.Millisecond):
+		t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1)
 	}
-}
 
-func TestStatsChannelTimeout(t *testing.T) {
-	raw, err := common.CreateObject(context.Background(), &Config{})
-	common.Must(err)
-
-	m := raw.(stats.Manager)
-	c, err := m.RegisterChannel("test.channel")
-	common.Must(err)
-
-	source := c.Channel()
-	a := c.Subscribe()
-	b := c.Subscribe()
-	defer c.Unsubscribe(a)
-	defer c.Unsubscribe(b)
-
-	stopCh := make(chan struct{})
-	errCh := make(chan string)
-
-	go func() {
-		source <- 1
-		source <- 2
-	}()
-
-	go func() {
-		if v, ok := (<-a).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		if v, ok := (<-a).(int); !ok || v != 2 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
-		}
-		{ // Test `b` is still in subscribers yet (because `a` receives 2 first)
-			var aSet, bSet bool
-			for _, s := range c.Subscribers() {
-				if s == a {
-					aSet = true
-				}
-				if s == b {
-					bSet = true
-				}
-			}
-			if !(aSet && bSet) {
-				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
-			}
-		}
-	}()
+	if len(c1.Subscribers()) != 0 { // Check subscribers in closed channel are emptied
+		t.Fatalf("unexpected non-empty subscribers in channel: test.channel.%d", 1)
+	}
 
-	go func() {
-		if v, ok := (<-b).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		// Block `b` channel for a time longer than `source`'s timeout
-		<-time.After(150 * time.Millisecond)
-		{ // Test `b` has been unsubscribed by source
-			var aSet, bSet bool
-			for _, s := range c.Subscribers() {
-				if s == a {
-					aSet = true
-				}
-				if s == b {
-					bSet = true
-				}
-			}
-			if !(aSet && !bSet) {
-				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
-			}
-		}
-		select { // Test `b` has been closed by source
-		case v, ok := <-b:
-			if ok {
-				errCh <- fmt.Sprint("unexpected data received: ", v)
-			}
-		default:
-		}
-		close(stopCh)
-	}()
+	common.Must(m.Close())
 
-	select {
-	case <-time.After(2 * time.Second):
-		t.Fatal("Test timeout after 2s")
-	case e := <-errCh:
-		t.Fatal(e)
-	case <-stopCh:
+	if c2.Running() {
+		t.Fatalf("unexpected running channel: test.channel.%d", 2)
 	}
-}
 
-func TestStatsChannelConcurrency(t *testing.T) {
-	raw, err := common.CreateObject(context.Background(), &Config{})
-	common.Must(err)
-
-	m := raw.(stats.Manager)
-	c, err := m.RegisterChannel("test.channel")
+	ch3, err := m.RegisterChannel("test.channel.3")
+	c3 := ch3.(*Channel)
 	common.Must(err)
 
-	source := c.Channel()
-	a := c.Subscribe()
-	b := c.Subscribe()
-	defer c.Unsubscribe(a)
-
-	stopCh := make(chan struct{})
-	errCh := make(chan string)
-
-	go func() {
-		source <- 1
-		source <- 2
-	}()
-
-	go func() {
-		if v, ok := (<-a).(int); !ok || v != 1 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
-		}
-		if v, ok := (<-a).(int); !ok || v != 2 {
-			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
-		}
-	}()
+	if c3.Running() {
+		t.Fatalf("unexpected running channel: test.channel.%d", 3)
+	}
 
-	go func() {
-		// Block `b` for a time shorter than `source`'s timeout
-		// So as to ensure source channel is trying to send message to `b`.
-		<-time.After(25 * time.Millisecond)
-		// This causes concurrency scenario: unsubscribe `b` while trying to send message to it
-		c.Unsubscribe(b)
-		// Test `b` is not closed and can still receive data 1:
-		// Because unsubscribe won't affect the ongoing process of sending message.
-		select {
-		case v, ok := <-b:
-			if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
-				errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
-			}
-		default:
-			errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
-		}
-		// Test `b` is not closed but cannot receive data 2:
-		// Becuase in a new round of messaging, `b` has been unsubscribed.
-		select {
-		case v, ok := <-b:
-			if ok {
-				errCh <- fmt.Sprint("unexpected receving: ", v)
-			} else {
-				errCh <- fmt.Sprint("unexpected closing of channel")
-			}
-		default:
-		}
-		close(stopCh)
-	}()
+	common.Must(c3.Start())
+	common.Must(m.UnregisterChannel("test.channel.3"))
 
-	select {
-	case <-time.After(2 * time.Second):
-		t.Fatal("Test timeout after 2s")
-	case e := <-errCh:
-		t.Fatal(e)
-	case <-stopCh:
+	if c3.Running() { // Test that unregistering will close the channel.
+		t.Fatalf("unexpected running channel: test.channel.%d", 3)
 	}
 }

+ 9 - 4
features/stats/stats.go

@@ -2,7 +2,10 @@ package stats
 
 //go:generate errorgen
 
-import "v2ray.com/core/features"
+import (
+	"v2ray.com/core/common"
+	"v2ray.com/core/features"
+)
 
 // Counter is the interface for stats counters.
 //
@@ -16,12 +19,14 @@ type Counter interface {
 	Add(int64) int64
 }
 
-// Channel is the interface for stats channel
+// Channel is the interface for stats channel.
 //
 // v2ray:api:stable
 type Channel interface {
-	// Channel returns the underlying go channel.
-	Channel() chan interface{}
+	// Channel is a runnable unit.
+	common.Runnable
+	// Publish broadcasts a message through the channel.
+	Publish(interface{})
 	// SubscriberCount returns the number of the subscribers.
 	Subscribers() []chan interface{}
 	// Subscribe registers for listening to channel stream and returns a new listener channel.