timed_queue.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueueImpl []*timedQueueEntry
  12. func (queue timedQueueImpl) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueueImpl) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueueImpl) Swap(i, j int) {
  19. queue[i], queue[j] = queue[j], queue[i]
  20. }
  21. func (queue *timedQueueImpl) Push(value interface{}) {
  22. entry := value.(*timedQueueEntry)
  23. *queue = append(*queue, entry)
  24. }
  25. func (queue *timedQueueImpl) Pop() interface{} {
  26. old := *queue
  27. n := len(old)
  28. v := old[n-1]
  29. old[n-1] = nil
  30. *queue = old[:n-1]
  31. return v
  32. }
  33. // TimedQueue is a priority queue that entries with oldest timestamp get removed first.
  34. type TimedQueue struct {
  35. queue timedQueueImpl
  36. access sync.Mutex
  37. removedCallback func(interface{})
  38. }
  39. func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue {
  40. queue := &TimedQueue{
  41. queue: make([]*timedQueueEntry, 0, 256),
  42. removedCallback: removedCallback,
  43. access: sync.Mutex{},
  44. }
  45. go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
  46. return queue
  47. }
  48. func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
  49. newEntry := &timedQueueEntry{
  50. timeSec: time2Remove,
  51. value: value,
  52. }
  53. var removedEntry *timedQueueEntry
  54. queue.access.Lock()
  55. nowSec := time.Now().Unix()
  56. if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec {
  57. removedEntry = queue.queue[0]
  58. queue.queue[0] = newEntry
  59. heap.Fix(&queue.queue, 0)
  60. } else {
  61. heap.Push(&queue.queue, newEntry)
  62. }
  63. queue.access.Unlock()
  64. if removedEntry != nil {
  65. queue.removedCallback(removedEntry.value)
  66. }
  67. }
  68. func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
  69. for now := range tick {
  70. nowSec := now.Unix()
  71. removedEntries := make([]*timedQueueEntry, 0, 128)
  72. queue.access.Lock()
  73. changed := false
  74. for i := 0; i < queue.queue.Len(); i++ {
  75. entry := queue.queue[i]
  76. if entry.timeSec < nowSec {
  77. removedEntries = append(removedEntries, entry)
  78. queue.queue.Swap(i, queue.queue.Len()-1)
  79. queue.queue.Pop()
  80. changed = true
  81. }
  82. }
  83. if changed {
  84. heap.Init(&queue.queue)
  85. }
  86. queue.access.Unlock()
  87. for _, entry := range removedEntries {
  88. queue.removedCallback(entry.value)
  89. }
  90. }
  91. }