connection_cache.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package hub
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type Once struct {
  9. m sync.Mutex
  10. done uint32
  11. }
  12. func (o *Once) Do(f func()) {
  13. if atomic.LoadUint32(&o.done) == 1 {
  14. return
  15. }
  16. o.m.Lock()
  17. defer o.m.Unlock()
  18. if o.done == 0 {
  19. defer atomic.StoreUint32(&o.done, 1)
  20. f()
  21. }
  22. }
  23. func (o *Once) Reset() {
  24. o.m.Lock()
  25. defer o.m.Unlock()
  26. atomic.StoreUint32(&o.done, 0)
  27. }
  28. type AwaitingConnection struct {
  29. conn net.Conn
  30. expire time.Time
  31. }
  32. func (this *AwaitingConnection) Expired() bool {
  33. return this.expire.Before(time.Now())
  34. }
  35. type ConnectionCache struct {
  36. sync.Mutex
  37. cache map[string][]*AwaitingConnection
  38. cleanupOnce Once
  39. }
  40. func NewConnectionCache() *ConnectionCache {
  41. return &ConnectionCache{
  42. cache: make(map[string][]*AwaitingConnection),
  43. }
  44. }
  45. func (this *ConnectionCache) Cleanup() {
  46. defer this.cleanupOnce.Reset()
  47. for len(this.cache) > 0 {
  48. time.Sleep(time.Second * 4)
  49. this.Lock()
  50. for key, value := range this.cache {
  51. size := len(value)
  52. changed := false
  53. for i := 0; i < size; {
  54. if value[i].Expired() {
  55. value[i].conn.Close()
  56. value[i] = value[size-1]
  57. size--
  58. changed = true
  59. } else {
  60. i++
  61. }
  62. }
  63. if changed {
  64. for i := size; i < len(value); i++ {
  65. value[i] = nil
  66. }
  67. value = value[:size]
  68. this.cache[key] = value
  69. }
  70. }
  71. this.Unlock()
  72. }
  73. }
  74. func (this *ConnectionCache) Recycle(dest string, conn net.Conn) {
  75. this.Lock()
  76. defer this.Unlock()
  77. aconn := &AwaitingConnection{
  78. conn: conn,
  79. expire: time.Now().Add(time.Second * 4),
  80. }
  81. var list []*AwaitingConnection
  82. if v, found := this.cache[dest]; found {
  83. v = append(v, aconn)
  84. list = v
  85. } else {
  86. list = []*AwaitingConnection{aconn}
  87. }
  88. this.cache[dest] = list
  89. go this.cleanupOnce.Do(this.Cleanup)
  90. }
  91. func FindFirstValid(list []*AwaitingConnection) int {
  92. for idx, conn := range list {
  93. if !conn.Expired() {
  94. return idx
  95. }
  96. conn.conn.Close()
  97. }
  98. return -1
  99. }
  100. func (this *ConnectionCache) Get(dest string) net.Conn {
  101. this.Lock()
  102. defer this.Unlock()
  103. list, found := this.cache[dest]
  104. if !found {
  105. return nil
  106. }
  107. firstValid := FindFirstValid(list)
  108. if firstValid == -1 {
  109. delete(this.cache, dest)
  110. return nil
  111. }
  112. res := list[firstValid].conn
  113. list = list[firstValid+1:]
  114. this.cache[dest] = list
  115. return res
  116. }