timed_map.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueue []*timedQueueEntry
  12. func (queue timedQueue) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueue) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueue) Swap(i, j int) {
  19. tmp := queue[i]
  20. queue[i] = queue[j]
  21. queue[j] = tmp
  22. }
  23. func (queue *timedQueue) Push(value interface{}) {
  24. entry := value.(*timedQueueEntry)
  25. *queue = append(*queue, entry)
  26. }
  27. func (queue *timedQueue) Pop() interface{} {
  28. old := *queue
  29. n := len(old)
  30. v := old[n-1]
  31. *queue = old[:n-1]
  32. return v
  33. }
  34. type TimedStringMap struct {
  35. timedQueue
  36. access sync.RWMutex
  37. data map[string]interface{}
  38. interval int
  39. }
  40. func NewTimedStringMap(updateInterval int) *TimedStringMap {
  41. m := &TimedStringMap{
  42. timedQueue: make([]*timedQueueEntry, 0, 1024),
  43. access: sync.RWMutex{},
  44. data: make(map[string]interface{}, 1024),
  45. interval: updateInterval,
  46. }
  47. m.initialize()
  48. return m
  49. }
  50. func (m *TimedStringMap) initialize() {
  51. go m.cleanup(time.Tick(time.Duration(m.interval) * time.Second))
  52. }
  53. func (m *TimedStringMap) cleanup(tick <-chan time.Time) {
  54. for {
  55. now := <-tick
  56. nowSec := now.UTC().Unix()
  57. if m.timedQueue.Len() == 0 {
  58. continue
  59. }
  60. for m.timedQueue.Len() > 0 {
  61. entry := m.timedQueue[0]
  62. if entry.timeSec > nowSec {
  63. break
  64. }
  65. m.access.Lock()
  66. entry = heap.Pop(&m.timedQueue).(*timedQueueEntry)
  67. m.access.Unlock()
  68. m.Remove(entry.value.(string))
  69. }
  70. }
  71. }
  72. func (m *TimedStringMap) Get(key string) (interface{}, bool) {
  73. m.access.RLock()
  74. value, ok := m.data[key]
  75. m.access.RUnlock()
  76. return value, ok
  77. }
  78. func (m *TimedStringMap) Set(key string, value interface{}, time2Delete int64) {
  79. m.access.Lock()
  80. m.data[key] = value
  81. heap.Push(&m.timedQueue, &timedQueueEntry{
  82. timeSec: time2Delete,
  83. value: key,
  84. })
  85. m.access.Unlock()
  86. }
  87. func (m *TimedStringMap) Remove(key string) {
  88. m.access.Lock()
  89. delete(m.data, key)
  90. m.access.Unlock()
  91. }