userset.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package user
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/v2ray/v2ray-core/common/collect"
  6. )
  7. const (
  8. updateIntervalSec = 10
  9. cacheDurationSec = 120
  10. )
  11. type UserSet interface {
  12. AddUser(user User) error
  13. GetUser(timeHash []byte) (*ID, int64, bool)
  14. }
  15. type TimedUserSet struct {
  16. validUserIds []ID
  17. userHash map[string]indexTimePair
  18. userHashDeleteQueue *collect.TimedQueue
  19. access sync.RWMutex
  20. }
  21. type indexTimePair struct {
  22. index int
  23. timeSec int64
  24. }
  25. func NewTimedUserSet() UserSet {
  26. tus := &TimedUserSet{
  27. validUserIds: make([]ID, 0, 16),
  28. userHash: make(map[string]indexTimePair, 512),
  29. userHashDeleteQueue: collect.NewTimedQueue(updateIntervalSec),
  30. access: sync.RWMutex{},
  31. }
  32. go tus.updateUserHash(time.Tick(updateIntervalSec * time.Second))
  33. go tus.removeEntries(tus.userHashDeleteQueue.RemovedEntries())
  34. return tus
  35. }
  36. func (us *TimedUserSet) removeEntries(entries <-chan interface{}) {
  37. for {
  38. entry := <-entries
  39. us.access.Lock()
  40. delete(us.userHash, entry.(string))
  41. us.access.Unlock()
  42. }
  43. }
  44. func (us *TimedUserSet) generateNewHashes(lastSec, nowSec int64, idx int, id ID) {
  45. idHash := NewTimeHash(HMACHash{})
  46. for lastSec < nowSec+cacheDurationSec {
  47. idHash := idHash.Hash(id.Bytes[:], lastSec)
  48. us.access.Lock()
  49. us.userHash[string(idHash)] = indexTimePair{idx, lastSec}
  50. us.access.Unlock()
  51. us.userHashDeleteQueue.Add(string(idHash), lastSec+2*cacheDurationSec)
  52. lastSec++
  53. }
  54. }
  55. func (us *TimedUserSet) updateUserHash(tick <-chan time.Time) {
  56. now := time.Now().UTC()
  57. lastSec := now.Unix()
  58. for {
  59. now := <-tick
  60. nowSec := now.UTC().Unix()
  61. for idx, id := range us.validUserIds {
  62. us.generateNewHashes(lastSec, nowSec, idx, id)
  63. }
  64. lastSec = nowSec
  65. }
  66. }
  67. func (us *TimedUserSet) AddUser(user User) error {
  68. id := user.Id
  69. idx := len(us.validUserIds)
  70. us.validUserIds = append(us.validUserIds, id)
  71. nowSec := time.Now().UTC().Unix()
  72. lastSec := nowSec - cacheDurationSec
  73. us.generateNewHashes(lastSec, nowSec, idx, id)
  74. return nil
  75. }
  76. func (us TimedUserSet) GetUser(userHash []byte) (*ID, int64, bool) {
  77. defer us.access.RUnlock()
  78. us.access.RLock()
  79. pair, found := us.userHash[string(userHash)]
  80. if found {
  81. return &us.validUserIds[pair.index], pair.timeSec, true
  82. }
  83. return nil, 0, false
  84. }