pool.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package internal
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. )
  7. // ConnectionRecyler is the interface for recycling connections.
  8. type ConnectionRecyler interface {
  9. // Put returns a connection back to a connection pool.
  10. Put(ConnectionID, net.Conn)
  11. }
  12. type NoOpConnectionRecyler struct{}
  13. func (NoOpConnectionRecyler) Put(ConnectionID, net.Conn) {}
  14. // ExpiringConnection is a connection that will expire in certain time.
  15. type ExpiringConnection struct {
  16. conn net.Conn
  17. expire time.Time
  18. }
  19. // Expired returns true if the connection has expired.
  20. func (ec *ExpiringConnection) Expired() bool {
  21. return ec.expire.Before(time.Now())
  22. }
  23. // Pool is a connection pool.
  24. type Pool struct {
  25. sync.Mutex
  26. connsByDest map[ConnectionID][]*ExpiringConnection
  27. cleanupToken chan bool
  28. }
  29. // NewConnectionPool creates a new Pool.
  30. func NewConnectionPool() *Pool {
  31. p := &Pool{
  32. connsByDest: make(map[ConnectionID][]*ExpiringConnection),
  33. cleanupToken: make(chan bool, 1),
  34. }
  35. p.cleanupToken <- true
  36. return p
  37. }
  38. // Get returns a connection with matching connection ID. Nil if not found.
  39. func (p *Pool) Get(id ConnectionID) net.Conn {
  40. p.Lock()
  41. defer p.Unlock()
  42. list, found := p.connsByDest[id]
  43. if !found {
  44. return nil
  45. }
  46. connIdx := -1
  47. for idx, conn := range list {
  48. if !conn.Expired() {
  49. connIdx = idx
  50. break
  51. }
  52. }
  53. if connIdx == -1 {
  54. return nil
  55. }
  56. listLen := len(list)
  57. conn := list[connIdx]
  58. if connIdx != listLen-1 {
  59. list[connIdx] = list[listLen-1]
  60. }
  61. list = list[:listLen-1]
  62. p.connsByDest[id] = list
  63. return conn.conn
  64. }
  65. func (p *Pool) cleanup() {
  66. defer func() {
  67. p.cleanupToken <- true
  68. }()
  69. for len(p.connsByDest) > 0 {
  70. time.Sleep(time.Second * 5)
  71. expiredConns := make([]net.Conn, 0, 16)
  72. p.Lock()
  73. for dest, list := range p.connsByDest {
  74. validConns := make([]*ExpiringConnection, 0, len(list))
  75. for _, conn := range list {
  76. if conn.Expired() {
  77. expiredConns = append(expiredConns, conn.conn)
  78. } else {
  79. validConns = append(validConns, conn)
  80. }
  81. }
  82. if len(validConns) != len(list) {
  83. p.connsByDest[dest] = validConns
  84. }
  85. }
  86. p.Unlock()
  87. for _, conn := range expiredConns {
  88. conn.Close()
  89. }
  90. }
  91. }
  92. // Put implements ConnectionRecyler.Put().
  93. func (p *Pool) Put(id ConnectionID, conn net.Conn) {
  94. expiringConn := &ExpiringConnection{
  95. conn: conn,
  96. expire: time.Now().Add(time.Second * 4),
  97. }
  98. p.Lock()
  99. defer p.Unlock()
  100. list, found := p.connsByDest[id]
  101. if !found {
  102. list = []*ExpiringConnection{expiringConn}
  103. } else {
  104. list = append(list, expiringConn)
  105. }
  106. p.connsByDest[id] = list
  107. select {
  108. case <-p.cleanupToken:
  109. go p.cleanup()
  110. default:
  111. }
  112. }