timed_queue.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueueImpl []*timedQueueEntry
  12. func (queue timedQueueImpl) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueueImpl) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueueImpl) Swap(i, j int) {
  19. queue[i], queue[j] = queue[j], queue[i]
  20. }
  21. func (queue *timedQueueImpl) Push(value interface{}) {
  22. entry := value.(*timedQueueEntry)
  23. *queue = append(*queue, entry)
  24. }
  25. func (queue *timedQueueImpl) Pop() interface{} {
  26. old := *queue
  27. n := len(old)
  28. v := old[n-1]
  29. old[n-1] = nil
  30. *queue = old[:n-1]
  31. return v
  32. }
  33. type TimedQueue struct {
  34. queue timedQueueImpl
  35. access sync.RWMutex
  36. removed chan interface{}
  37. }
  38. func NewTimedQueue(updateInterval int) *TimedQueue {
  39. queue := &TimedQueue{
  40. queue: make([]*timedQueueEntry, 0, 256),
  41. removed: make(chan interface{}, 16),
  42. access: sync.RWMutex{},
  43. }
  44. go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
  45. return queue
  46. }
  47. func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
  48. queue.access.Lock()
  49. heap.Push(&queue.queue, &timedQueueEntry{
  50. timeSec: time2Remove,
  51. value: value,
  52. })
  53. queue.access.Unlock()
  54. }
  55. func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
  56. return queue.removed
  57. }
  58. func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
  59. for now := range tick {
  60. nowSec := now.Unix()
  61. for {
  62. queue.access.RLock()
  63. queueLen := queue.queue.Len()
  64. queue.access.RUnlock()
  65. if queueLen == 0 {
  66. break
  67. }
  68. queue.access.RLock()
  69. entry := queue.queue[0]
  70. queue.access.RUnlock()
  71. if entry.timeSec > nowSec {
  72. break
  73. }
  74. queue.access.Lock()
  75. heap.Pop(&queue.queue)
  76. queue.access.Unlock()
  77. queue.removed <- entry.value
  78. }
  79. }
  80. }