|
|
@@ -71,8 +71,7 @@ func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
|
|
|
}
|
|
|
|
|
|
func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
|
|
|
- for {
|
|
|
- now := <-tick
|
|
|
+ for now := range tick {
|
|
|
queue.access.RLock()
|
|
|
queueLen := queue.queue.Len()
|
|
|
queue.access.RUnlock()
|
|
|
@@ -80,16 +79,18 @@ func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
|
|
|
continue
|
|
|
}
|
|
|
nowSec := now.UTC().Unix()
|
|
|
- queue.access.RLock()
|
|
|
- firstEntryTime := queue.queue[0].timeSec
|
|
|
- queue.access.RUnlock()
|
|
|
- if firstEntryTime > nowSec {
|
|
|
- continue
|
|
|
- }
|
|
|
- queue.access.Lock()
|
|
|
- firstEntryValue := heap.Pop(&queue.queue).(*timedQueueEntry).value
|
|
|
- queue.access.Unlock()
|
|
|
+ for {
|
|
|
+ queue.access.RLock()
|
|
|
+ entry := queue.queue[0]
|
|
|
+ queue.access.RUnlock()
|
|
|
+ if entry.timeSec > nowSec {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ queue.access.Lock()
|
|
|
+ heap.Pop(&queue.queue)
|
|
|
+ queue.access.Unlock()
|
|
|
|
|
|
- queue.removed <- firstEntryValue
|
|
|
+ queue.removed <- entry.value
|
|
|
+ }
|
|
|
}
|
|
|
}
|