pubsub.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package internal
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/app"
  5. "github.com/v2ray/v2ray-core/app/pubsub"
  6. )
  7. type TopicHandlerList struct {
  8. sync.RWMutex
  9. handlers []pubsub.TopicHandler
  10. }
  11. func NewTopicHandlerList(handlers ...pubsub.TopicHandler) *TopicHandlerList {
  12. return &TopicHandlerList{
  13. handlers: handlers,
  14. }
  15. }
  16. func (this *TopicHandlerList) Add(handler pubsub.TopicHandler) {
  17. this.Lock()
  18. this.handlers = append(this.handlers, handler)
  19. this.Unlock()
  20. }
  21. func (this *TopicHandlerList) Dispatch(message pubsub.PubsubMessage) {
  22. this.RLock()
  23. for _, handler := range this.handlers {
  24. go handler(message)
  25. }
  26. this.RUnlock()
  27. }
  28. type Pubsub struct {
  29. topics map[string]*TopicHandlerList
  30. sync.RWMutex
  31. }
  32. func New() *Pubsub {
  33. return &Pubsub{
  34. topics: make(map[string]*TopicHandlerList),
  35. }
  36. }
  37. func (this *Pubsub) Publish(context app.Context, topic string, message pubsub.PubsubMessage) {
  38. this.RLock()
  39. list, found := this.topics[topic]
  40. this.RUnlock()
  41. if found {
  42. list.Dispatch(message)
  43. }
  44. }
  45. func (this *Pubsub) Subscribe(context app.Context, topic string, handler pubsub.TopicHandler) {
  46. this.Lock()
  47. defer this.Unlock()
  48. if list, found := this.topics[topic]; found {
  49. list.Add(handler)
  50. } else {
  51. this.topics[topic] = NewTopicHandlerList(handler)
  52. }
  53. }