|  | @@ -41,55 +41,63 @@ func (queue *timedQueueImpl) Pop() interface{} {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // TimedQueue is a priority queue that entries with oldest timestamp get removed first.
 | 
	
		
			
				|  |  |  type TimedQueue struct {
 | 
	
		
			
				|  |  | -	queue   timedQueueImpl
 | 
	
		
			
				|  |  | -	access  sync.RWMutex
 | 
	
		
			
				|  |  | -	removed chan interface{}
 | 
	
		
			
				|  |  | +	queue           timedQueueImpl
 | 
	
		
			
				|  |  | +	access          sync.Mutex
 | 
	
		
			
				|  |  | +	removedCallback func(interface{})
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func NewTimedQueue(updateInterval int) *TimedQueue {
 | 
	
		
			
				|  |  | +func NewTimedQueue(updateInterval int, removedCallback func(interface{})) *TimedQueue {
 | 
	
		
			
				|  |  |  	queue := &TimedQueue{
 | 
	
		
			
				|  |  | -		queue:   make([]*timedQueueEntry, 0, 256),
 | 
	
		
			
				|  |  | -		removed: make(chan interface{}, 16),
 | 
	
		
			
				|  |  | -		access:  sync.RWMutex{},
 | 
	
		
			
				|  |  | +		queue:           make([]*timedQueueEntry, 0, 256),
 | 
	
		
			
				|  |  | +		removedCallback: removedCallback,
 | 
	
		
			
				|  |  | +		access:          sync.Mutex{},
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
 | 
	
		
			
				|  |  |  	return queue
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
 | 
	
		
			
				|  |  | -	queue.access.Lock()
 | 
	
		
			
				|  |  | -	heap.Push(&queue.queue, &timedQueueEntry{
 | 
	
		
			
				|  |  | +	newEntry := &timedQueueEntry{
 | 
	
		
			
				|  |  |  		timeSec: time2Remove,
 | 
	
		
			
				|  |  |  		value:   value,
 | 
	
		
			
				|  |  | -	})
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +	var removedEntry *timedQueueEntry
 | 
	
		
			
				|  |  | +	queue.access.Lock()
 | 
	
		
			
				|  |  | +	nowSec := time.Now().Unix()
 | 
	
		
			
				|  |  | +	if queue.queue.Len() > 0 && queue.queue[0].timeSec < nowSec {
 | 
	
		
			
				|  |  | +		removedEntry = queue.queue[0]
 | 
	
		
			
				|  |  | +		queue.queue[0] = newEntry
 | 
	
		
			
				|  |  | +		heap.Fix(&queue.queue, 0)
 | 
	
		
			
				|  |  | +	} else {
 | 
	
		
			
				|  |  | +		heap.Push(&queue.queue, newEntry)
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  |  	queue.access.Unlock()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
 | 
	
		
			
				|  |  | -	return queue.removed
 | 
	
		
			
				|  |  | +	if removedEntry != nil {
 | 
	
		
			
				|  |  | +		queue.removedCallback(removedEntry)
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
 | 
	
		
			
				|  |  |  	for now := range tick {
 | 
	
		
			
				|  |  |  		nowSec := now.Unix()
 | 
	
		
			
				|  |  | -		for {
 | 
	
		
			
				|  |  | -			queue.access.RLock()
 | 
	
		
			
				|  |  | -			queueLen := queue.queue.Len()
 | 
	
		
			
				|  |  | -			queue.access.RUnlock()
 | 
	
		
			
				|  |  | -			if queueLen == 0 {
 | 
	
		
			
				|  |  | -				break
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -			queue.access.RLock()
 | 
	
		
			
				|  |  | -			entry := queue.queue[0]
 | 
	
		
			
				|  |  | -			queue.access.RUnlock()
 | 
	
		
			
				|  |  | -			if entry.timeSec > nowSec {
 | 
	
		
			
				|  |  | -				break
 | 
	
		
			
				|  |  | +		removedEntries := make([]*timedQueueEntry, 0, 128)
 | 
	
		
			
				|  |  | +		queue.access.Lock()
 | 
	
		
			
				|  |  | +		changed := false
 | 
	
		
			
				|  |  | +		for i := 0; i < queue.queue.Len(); i++ {
 | 
	
		
			
				|  |  | +			entry := queue.queue[i]
 | 
	
		
			
				|  |  | +			if entry.timeSec < nowSec {
 | 
	
		
			
				|  |  | +				removedEntries = append(removedEntries, entry)
 | 
	
		
			
				|  |  | +				queue.queue.Swap(i, queue.queue.Len()-1)
 | 
	
		
			
				|  |  | +				queue.queue.Pop()
 | 
	
		
			
				|  |  | +				changed = true
 | 
	
		
			
				|  |  |  			}
 | 
	
		
			
				|  |  | -			queue.access.Lock()
 | 
	
		
			
				|  |  | -			heap.Pop(&queue.queue)
 | 
	
		
			
				|  |  | -			queue.access.Unlock()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -			queue.removed <- entry.value
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		if changed {
 | 
	
		
			
				|  |  | +			heap.Init(&queue.queue)
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		queue.access.Unlock()
 | 
	
		
			
				|  |  | +		for _, entry := range removedEntries {
 | 
	
		
			
				|  |  | +			queue.removedCallback(entry.value)
 | 
	
		
			
				|  |  |  		}
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  }
 |