| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 | package collectimport (	"container/heap"	"sync"	"time")type timedQueueEntry struct {	timeSec int64	value   interface{}}type timedQueueImpl []*timedQueueEntryfunc (queue timedQueueImpl) Len() int {	return len(queue)}func (queue timedQueueImpl) Less(i, j int) bool {	return queue[i].timeSec < queue[j].timeSec}func (queue timedQueueImpl) Swap(i, j int) {	queue[i], queue[j] = queue[j], queue[i]}func (queue *timedQueueImpl) Push(value interface{}) {	entry := value.(*timedQueueEntry)	*queue = append(*queue, entry)}func (queue *timedQueueImpl) Pop() interface{} {	old := *queue	n := len(old)	v := old[n-1]	old[n-1] = nil	*queue = old[:n-1]	return v}// TimedQueue is a priority queue that entries with oldest timestamp get removed first.type TimedQueue struct {	queue           timedQueueImpl	access          sync.Mutex	removedCallback func(interface{})}func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue {	queue := &TimedQueue{		queue:           make([]*timedQueueEntry, 0, 256),		removedCallback: removedCallback,		access:          sync.Mutex{},	}	go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))	return queue}func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {	newEntry := &timedQueueEntry{		timeSec: time2Remove,		value:   value,	}	var removedEntry *timedQueueEntry	queue.access.Lock()	nowSec := time.Now().Unix()	if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec {		removedEntry = queue.queue[0]		queue.queue[0] = newEntry		heap.Fix(&queue.queue, 0)	} else {		heap.Push(&queue.queue, newEntry)	}	queue.access.Unlock()	if removedEntry != nil {		queue.removedCallback(removedEntry.value)	}}func (queue *TimedQueue) cleanup(tick <-chan time.Time) {	for now := range tick {		nowSec := now.Unix()		removedEntries := make([]*timedQueueEntry, 0, 128)		queue.access.Lock()		changed := false		for i := 0; i < queue.queue.Len(); i++ {			entry := queue.queue[i]			if entry.timeSec < nowSec {				removedEntries = append(removedEntries, entry)				queue.queue.Swap(i, queue.queue.Len()-1)				queue.queue.Pop()				changed = true			} else {				break			}		}		if changed {			heap.Init(&queue.queue)		}		queue.access.Unlock()		for _, entry := range removedEntries {			queue.removedCallback(entry.value)		}	}}
 |