timed_map.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueue []*timedQueueEntry
  12. func (queue timedQueue) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueue) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueue) Swap(i, j int) {
  19. tmp := queue[i]
  20. queue[i] = queue[j]
  21. queue[j] = tmp
  22. }
  23. func (queue *timedQueue) Push(value interface{}) {
  24. entry := value.(*timedQueueEntry)
  25. *queue = append(*queue, entry)
  26. }
  27. func (queue *timedQueue) Pop() interface{} {
  28. old := *queue
  29. n := len(old)
  30. v := old[n-1]
  31. *queue = old[:n-1]
  32. return v
  33. }
  34. type TimedStringMap struct {
  35. timedQueue
  36. queueMutex sync.Mutex
  37. dataMutext sync.RWMutex
  38. data map[string]interface{}
  39. interval int
  40. }
  41. func NewTimedStringMap(updateInterval int) *TimedStringMap {
  42. m := &TimedStringMap{
  43. timedQueue: make([]*timedQueueEntry, 0, 1024),
  44. queueMutex: sync.Mutex{},
  45. dataMutext: sync.RWMutex{},
  46. data: make(map[string]interface{}, 1024),
  47. interval: updateInterval,
  48. }
  49. m.initialize()
  50. return m
  51. }
  52. func (m *TimedStringMap) initialize() {
  53. go m.cleanup(time.Tick(time.Duration(m.interval) * time.Second))
  54. }
  55. func (m *TimedStringMap) cleanup(tick <-chan time.Time) {
  56. for {
  57. now := <-tick
  58. nowSec := now.UTC().Unix()
  59. if m.timedQueue.Len() == 0 {
  60. continue
  61. }
  62. for m.timedQueue.Len() > 0 {
  63. entry := m.timedQueue[0]
  64. if entry.timeSec > nowSec {
  65. break
  66. }
  67. m.queueMutex.Lock()
  68. entry = heap.Pop(&m.timedQueue).(*timedQueueEntry)
  69. m.queueMutex.Unlock()
  70. m.Remove(entry.value.(string))
  71. }
  72. }
  73. }
  74. func (m *TimedStringMap) Get(key string) (interface{}, bool) {
  75. m.dataMutext.RLock()
  76. value, ok := m.data[key]
  77. m.dataMutext.RUnlock()
  78. return value, ok
  79. }
  80. func (m *TimedStringMap) Set(key string, value interface{}, time2Delete int64) {
  81. m.dataMutext.Lock()
  82. m.data[key] = value
  83. m.dataMutext.Unlock()
  84. m.queueMutex.Lock()
  85. heap.Push(&m.timedQueue, &timedQueueEntry{
  86. timeSec: time2Delete,
  87. value: key,
  88. })
  89. m.queueMutex.Unlock()
  90. }
  91. func (m *TimedStringMap) Remove(key string) {
  92. m.dataMutext.Lock()
  93. delete(m.data, key)
  94. m.dataMutext.Unlock()
  95. }