| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package collect
- import (
- "container/heap"
- "sync"
- "time"
- )
- type timedQueueEntry struct {
- timeSec int64
- value interface{}
- }
- type timedQueueImpl []*timedQueueEntry
- func (queue timedQueueImpl) Len() int {
- return len(queue)
- }
- func (queue timedQueueImpl) Less(i, j int) bool {
- return queue[i].timeSec < queue[j].timeSec
- }
- func (queue timedQueueImpl) Swap(i, j int) {
- queue[i], queue[j] = queue[j], queue[i]
- }
- func (queue *timedQueueImpl) Push(value interface{}) {
- entry := value.(*timedQueueEntry)
- *queue = append(*queue, entry)
- }
- func (queue *timedQueueImpl) Pop() interface{} {
- old := *queue
- n := len(old)
- v := old[n-1]
- old[n-1] = nil
- *queue = old[:n-1]
- return v
- }
- // TimedQueue is a priority queue that entries with oldest timestamp get removed first.
- type TimedQueue struct {
- queue timedQueueImpl
- access sync.Mutex
- removedCallback func(interface{})
- }
- func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue {
- queue := &TimedQueue{
- queue: make([]*timedQueueEntry, 0, 256),
- removedCallback: removedCallback,
- access: sync.Mutex{},
- }
- go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
- return queue
- }
- func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
- newEntry := &timedQueueEntry{
- timeSec: time2Remove,
- value: value,
- }
- var removedEntry *timedQueueEntry
- queue.access.Lock()
- nowSec := time.Now().Unix()
- if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec {
- removedEntry = queue.queue[0]
- queue.queue[0] = newEntry
- heap.Fix(&queue.queue, 0)
- } else {
- heap.Push(&queue.queue, newEntry)
- }
- queue.access.Unlock()
- if removedEntry != nil {
- queue.removedCallback(removedEntry.value)
- }
- }
- func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
- for now := range tick {
- nowSec := now.Unix()
- removedEntries := make([]*timedQueueEntry, 0, 128)
- queue.access.Lock()
- changed := false
- for i := 0; i < queue.queue.Len(); i++ {
- entry := queue.queue[i]
- if entry.timeSec < nowSec {
- removedEntries = append(removedEntries, entry)
- queue.queue.Swap(i, queue.queue.Len()-1)
- queue.queue.Pop()
- changed = true
- }
- }
- if changed {
- heap.Init(&queue.queue)
- }
- queue.access.Unlock()
- for _, entry := range removedEntries {
- queue.removedCallback(entry.value)
- }
- }
- }
|