| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 | 
							- package collect
 
- import (
 
- 	"container/heap"
 
- 	"sync"
 
- 	"time"
 
- )
 
- type timedQueueEntry struct {
 
- 	timeSec int64
 
- 	value   interface{}
 
- }
 
- type timedQueue []*timedQueueEntry
 
- func (queue timedQueue) Len() int {
 
- 	return len(queue)
 
- }
 
- func (queue timedQueue) Less(i, j int) bool {
 
- 	return queue[i].timeSec < queue[j].timeSec
 
- }
 
- func (queue timedQueue) Swap(i, j int) {
 
- 	tmp := queue[i]
 
- 	queue[i] = queue[j]
 
- 	queue[j] = tmp
 
- }
 
- func (queue *timedQueue) Push(value interface{}) {
 
- 	entry := value.(*timedQueueEntry)
 
- 	*queue = append(*queue, entry)
 
- }
 
- func (queue *timedQueue) Pop() interface{} {
 
- 	old := *queue
 
- 	n := len(old)
 
- 	v := old[n-1]
 
- 	*queue = old[:n-1]
 
- 	return v
 
- }
 
- type TimedStringMap struct {
 
- 	timedQueue
 
- 	queueMutex sync.Mutex
 
- 	dataMutext sync.RWMutex
 
- 	data       map[string]interface{}
 
- 	interval   int
 
- }
 
- func NewTimedStringMap(updateInterval int) *TimedStringMap {
 
- 	m := &TimedStringMap{
 
- 		timedQueue: make([]*timedQueueEntry, 0, 1024),
 
- 		queueMutex: sync.Mutex{},
 
- 		dataMutext: sync.RWMutex{},
 
- 		data:       make(map[string]interface{}, 1024),
 
- 		interval:   updateInterval,
 
- 	}
 
- 	m.initialize()
 
- 	return m
 
- }
 
- func (m *TimedStringMap) initialize() {
 
- 	go m.cleanup(time.Tick(time.Duration(m.interval) * time.Second))
 
- }
 
- func (m *TimedStringMap) cleanup(tick <-chan time.Time) {
 
- 	for {
 
- 		now := <-tick
 
- 		nowSec := now.UTC().Unix()
 
- 		if m.timedQueue.Len() == 0 {
 
- 			continue
 
- 		}
 
- 		for m.timedQueue.Len() > 0 {
 
- 			entry := m.timedQueue[0]
 
- 			if entry.timeSec > nowSec {
 
- 				break
 
- 			}
 
- 			m.queueMutex.Lock()
 
- 			entry = heap.Pop(&m.timedQueue).(*timedQueueEntry)
 
- 			m.queueMutex.Unlock()
 
- 			m.Remove(entry.value.(string))
 
- 		}
 
- 	}
 
- }
 
- func (m *TimedStringMap) Get(key string) (interface{}, bool) {
 
- 	m.dataMutext.RLock()
 
- 	value, ok := m.data[key]
 
- 	m.dataMutext.RUnlock()
 
- 	return value, ok
 
- }
 
- func (m *TimedStringMap) Set(key string, value interface{}, time2Delete int64) {
 
- 	m.dataMutext.Lock()
 
- 	m.data[key] = value
 
- 	m.dataMutext.Unlock()
 
- 	m.queueMutex.Lock()
 
- 	heap.Push(&m.timedQueue, &timedQueueEntry{
 
- 		timeSec: time2Delete,
 
- 		value:   key,
 
- 	})
 
- 	m.queueMutex.Unlock()
 
- }
 
- func (m *TimedStringMap) Remove(key string) {
 
- 	m.dataMutext.Lock()
 
- 	delete(m.data, key)
 
- 	m.dataMutext.Unlock()
 
- }
 
 
  |