| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | 
							- package collect
 
- import (
 
- 	"container/heap"
 
- 	"sync"
 
- 	"time"
 
- )
 
- type timedQueueEntry struct {
 
- 	timeSec int64
 
- 	value   interface{}
 
- }
 
- type timedQueueImpl []*timedQueueEntry
 
- func (queue timedQueueImpl) Len() int {
 
- 	return len(queue)
 
- }
 
- func (queue timedQueueImpl) Less(i, j int) bool {
 
- 	return queue[i].timeSec < queue[j].timeSec
 
- }
 
- func (queue timedQueueImpl) Swap(i, j int) {
 
- 	queue[i], queue[j] = queue[j], queue[i]
 
- }
 
- func (queue *timedQueueImpl) Push(value interface{}) {
 
- 	entry := value.(*timedQueueEntry)
 
- 	*queue = append(*queue, entry)
 
- }
 
- func (queue *timedQueueImpl) Pop() interface{} {
 
- 	old := *queue
 
- 	n := len(old)
 
- 	v := old[n-1]
 
- 	old[n-1] = nil
 
- 	*queue = old[:n-1]
 
- 	return v
 
- }
 
- type TimedQueue struct {
 
- 	queue   timedQueueImpl
 
- 	access  sync.RWMutex
 
- 	removed chan interface{}
 
- }
 
- func NewTimedQueue(updateInterval int) *TimedQueue {
 
- 	queue := &TimedQueue{
 
- 		queue:   make([]*timedQueueEntry, 0, 256),
 
- 		removed: make(chan interface{}, 16),
 
- 		access:  sync.RWMutex{},
 
- 	}
 
- 	go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
 
- 	return queue
 
- }
 
- func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
 
- 	queue.access.Lock()
 
- 	heap.Push(&queue.queue, &timedQueueEntry{
 
- 		timeSec: time2Remove,
 
- 		value:   value,
 
- 	})
 
- 	queue.access.Unlock()
 
- }
 
- func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
 
- 	return queue.removed
 
- }
 
- func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
 
- 	for now := range tick {
 
- 		nowSec := now.Unix()
 
- 		for {
 
- 			queue.access.RLock()
 
- 			queueLen := queue.queue.Len()
 
- 			queue.access.RUnlock()
 
- 			if queueLen == 0 {
 
- 				break
 
- 			}
 
- 			queue.access.RLock()
 
- 			entry := queue.queue[0]
 
- 			queue.access.RUnlock()
 
- 			if entry.timeSec > nowSec {
 
- 				break
 
- 			}
 
- 			queue.access.Lock()
 
- 			heap.Pop(&queue.queue)
 
- 			queue.access.Unlock()
 
- 			queue.removed <- entry.value
 
- 		}
 
- 	}
 
- }
 
 
  |