pubsub.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package pubsub
  2. import (
  3. "sync"
  4. "time"
  5. "v2ray.com/core/common"
  6. "v2ray.com/core/common/signal/done"
  7. "v2ray.com/core/common/task"
  8. )
  9. type Subscriber struct {
  10. buffer chan interface{}
  11. done *done.Instance
  12. }
  13. func (s *Subscriber) push(msg interface{}) {
  14. select {
  15. case s.buffer <- msg:
  16. default:
  17. }
  18. }
  19. func (s *Subscriber) Wait() <-chan interface{} {
  20. return s.buffer
  21. }
  22. func (s *Subscriber) Close() error {
  23. return s.done.Close()
  24. }
  25. func (s *Subscriber) IsClosed() bool {
  26. return s.done.Done()
  27. }
  28. type Service struct {
  29. sync.RWMutex
  30. subs map[string][]*Subscriber
  31. ctask *task.Periodic
  32. }
  33. func NewService() *Service {
  34. s := &Service{
  35. subs: make(map[string][]*Subscriber),
  36. }
  37. s.ctask = &task.Periodic{
  38. Execute: s.Cleanup,
  39. Interval: time.Second * 30,
  40. }
  41. common.Must(s.ctask.Start())
  42. return s
  43. }
  44. // Cleanup cleans up internal caches of subscribers.
  45. // Visible for testing only.
  46. func (s *Service) Cleanup() error {
  47. s.Lock()
  48. defer s.Unlock()
  49. for name, subs := range s.subs {
  50. newSub := make([]*Subscriber, 0, len(s.subs))
  51. for _, sub := range subs {
  52. if !sub.IsClosed() {
  53. newSub = append(newSub, sub)
  54. }
  55. }
  56. if len(newSub) == 0 {
  57. delete(s.subs, name)
  58. } else {
  59. s.subs[name] = newSub
  60. }
  61. }
  62. if len(s.subs) == 0 {
  63. s.subs = make(map[string][]*Subscriber)
  64. }
  65. return nil
  66. }
  67. func (s *Service) Subscribe(name string) *Subscriber {
  68. sub := &Subscriber{
  69. buffer: make(chan interface{}, 16),
  70. done: done.New(),
  71. }
  72. s.Lock()
  73. subs := append(s.subs[name], sub)
  74. s.subs[name] = subs
  75. s.Unlock()
  76. return sub
  77. }
  78. func (s *Service) Publish(name string, message interface{}) {
  79. s.RLock()
  80. defer s.RUnlock()
  81. for _, sub := range s.subs[name] {
  82. if !sub.IsClosed() {
  83. sub.push(message)
  84. }
  85. }
  86. }