timed_queue.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package collect
  2. import (
  3. "container/heap"
  4. "sync"
  5. "time"
  6. )
  7. type timedQueueEntry struct {
  8. timeSec int64
  9. value interface{}
  10. }
  11. type timedQueueImpl []*timedQueueEntry
  12. func (queue timedQueueImpl) Len() int {
  13. return len(queue)
  14. }
  15. func (queue timedQueueImpl) Less(i, j int) bool {
  16. return queue[i].timeSec < queue[j].timeSec
  17. }
  18. func (queue timedQueueImpl) Swap(i, j int) {
  19. tmp := queue[i]
  20. queue[i] = queue[j]
  21. queue[j] = tmp
  22. }
  23. func (queue *timedQueueImpl) Push(value interface{}) {
  24. entry := value.(*timedQueueEntry)
  25. *queue = append(*queue, entry)
  26. }
  27. func (queue *timedQueueImpl) Pop() interface{} {
  28. old := *queue
  29. n := len(old)
  30. v := old[n-1]
  31. *queue = old[:n-1]
  32. return v
  33. }
  34. type TimedQueue struct {
  35. queue timedQueueImpl
  36. access sync.Mutex
  37. removed chan interface{}
  38. }
  39. func NewTimedQueue(updateInterval int) *TimedQueue {
  40. queue := &TimedQueue{
  41. queue: make([]*timedQueueEntry, 0, 256),
  42. removed: make(chan interface{}, 16),
  43. access: sync.Mutex{},
  44. }
  45. go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
  46. return queue
  47. }
  48. func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
  49. queue.access.Lock()
  50. heap.Push(&queue.queue, &timedQueueEntry{
  51. timeSec: time2Remove,
  52. value: value,
  53. })
  54. queue.access.Unlock()
  55. }
  56. func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
  57. return queue.removed
  58. }
  59. func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
  60. for {
  61. now := <-tick
  62. if queue.queue.Len() == 0 {
  63. continue
  64. }
  65. nowSec := now.UTC().Unix()
  66. entry := queue.queue[0]
  67. if entry.timeSec > nowSec {
  68. continue
  69. }
  70. queue.access.Lock()
  71. entry = heap.Pop(&queue.queue).(*timedQueueEntry)
  72. queue.access.Unlock()
  73. queue.removed <- entry.value
  74. }
  75. }