pubsub.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package pubsub
  2. import (
  3. "sync"
  4. "time"
  5. "v2ray.com/core/common"
  6. "v2ray.com/core/common/task"
  7. )
  8. type Subscriber struct {
  9. name string
  10. buffer chan interface{}
  11. removed chan struct{}
  12. }
  13. func (s *Subscriber) push(msg interface{}) {
  14. select {
  15. case s.buffer <- msg:
  16. default:
  17. }
  18. }
  19. func (s *Subscriber) Wait() <-chan interface{} {
  20. return s.buffer
  21. }
  22. func (s *Subscriber) Close() {
  23. close(s.removed)
  24. }
  25. func (s *Subscriber) IsClosed() bool {
  26. select {
  27. case <-s.removed:
  28. return true
  29. default:
  30. return false
  31. }
  32. }
  33. type Service struct {
  34. sync.RWMutex
  35. subs []*Subscriber
  36. ctask *task.Periodic
  37. }
  38. func NewService() *Service {
  39. s := &Service{}
  40. s.ctask = &task.Periodic{
  41. Execute: s.cleanup,
  42. Interval: time.Second * 30,
  43. }
  44. common.Must(s.ctask.Start())
  45. return s
  46. }
  47. func (s *Service) cleanup() error {
  48. s.Lock()
  49. defer s.Unlock()
  50. if len(s.subs) < 16 {
  51. return nil
  52. }
  53. newSub := make([]*Subscriber, 0, len(s.subs))
  54. for _, sub := range s.subs {
  55. if !sub.IsClosed() {
  56. newSub = append(newSub, sub)
  57. }
  58. }
  59. s.subs = newSub
  60. return nil
  61. }
  62. func (s *Service) Subscribe(name string) *Subscriber {
  63. sub := &Subscriber{
  64. name: name,
  65. buffer: make(chan interface{}, 16),
  66. removed: make(chan struct{}),
  67. }
  68. s.Lock()
  69. s.subs = append(s.subs, sub)
  70. s.Unlock()
  71. return sub
  72. }
  73. func (s *Service) Publish(name string, message interface{}) {
  74. s.RLock()
  75. defer s.RUnlock()
  76. for _, sub := range s.subs {
  77. if sub.name == name && !sub.IsClosed() {
  78. sub.push(message)
  79. }
  80. }
  81. }