timed_queue.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueueImpl []*timedQueueEntry
  12. func (queue timedQueueImpl) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueueImpl) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueueImpl) Swap(i, j int) {
  19. tmp := queue[i]
  20. queue[i] = queue[j]
  21. queue[j] = tmp
  22. }
  23. func (queue *timedQueueImpl) Push(value interface{}) {
  24. entry := value.(*timedQueueEntry)
  25. *queue = append(*queue, entry)
  26. }
  27. func (queue *timedQueueImpl) Pop() interface{} {
  28. old := *queue
  29. n := len(old)
  30. v := old[n-1]
  31. old[n-1] = nil
  32. *queue = old[:n-1]
  33. return v
  34. }
  35. type TimedQueue struct {
  36. queue timedQueueImpl
  37. access sync.RWMutex
  38. removed chan interface{}
  39. }
  40. func NewTimedQueue(updateInterval int) *TimedQueue {
  41. queue := &TimedQueue{
  42. queue: make([]*timedQueueEntry, 0, 256),
  43. removed: make(chan interface{}, 16),
  44. access: sync.RWMutex{},
  45. }
  46. go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
  47. return queue
  48. }
  49. func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
  50. queue.access.Lock()
  51. heap.Push(&queue.queue, &timedQueueEntry{
  52. timeSec: time2Remove,
  53. value: value,
  54. })
  55. queue.access.Unlock()
  56. }
  57. func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
  58. return queue.removed
  59. }
  60. func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
  61. for now := range tick {
  62. nowSec := now.UTC().Unix()
  63. for {
  64. queue.access.RLock()
  65. queueLen := queue.queue.Len()
  66. queue.access.RUnlock()
  67. if queueLen == 0 {
  68. break
  69. }
  70. queue.access.RLock()
  71. entry := queue.queue[0]
  72. queue.access.RUnlock()
  73. if entry.timeSec > nowSec {
  74. break
  75. }
  76. queue.access.Lock()
  77. heap.Pop(&queue.queue)
  78. queue.access.Unlock()
  79. queue.removed <- entry.value
  80. }
  81. }
  82. }