hub.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package tcp
  2. import (
  3. "errors"
  4. "net"
  5. "sync"
  6. "time"
  7. v2net "github.com/v2ray/v2ray-core/common/net"
  8. "github.com/v2ray/v2ray-core/transport/internet"
  9. )
  10. var (
  11. ErrClosedListener = errors.New("Listener is closed.")
  12. )
  13. type ConnectionWithError struct {
  14. conn net.Conn
  15. err error
  16. }
  17. type TCPListener struct {
  18. sync.Mutex
  19. acccepting bool
  20. listener *net.TCPListener
  21. awaitingConns chan *ConnectionWithError
  22. }
  23. func ListenTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
  24. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  25. IP: address.IP(),
  26. Port: int(port),
  27. })
  28. if err != nil {
  29. return nil, err
  30. }
  31. l := &TCPListener{
  32. acccepting: true,
  33. listener: listener,
  34. awaitingConns: make(chan *ConnectionWithError, 32),
  35. }
  36. go l.KeepAccepting()
  37. return l, nil
  38. }
  39. func (this *TCPListener) Accept() (internet.Connection, error) {
  40. for this.acccepting {
  41. select {
  42. case connErr, open := <-this.awaitingConns:
  43. if !open {
  44. return nil, ErrClosedListener
  45. }
  46. if connErr.err != nil {
  47. return nil, connErr.err
  48. }
  49. return NewConnection("", connErr.conn, this), nil
  50. case <-time.After(time.Second * 2):
  51. }
  52. }
  53. return nil, ErrClosedListener
  54. }
  55. func (this *TCPListener) KeepAccepting() {
  56. for this.acccepting {
  57. conn, err := this.listener.Accept()
  58. this.Lock()
  59. if !this.acccepting {
  60. this.Unlock()
  61. break
  62. }
  63. select {
  64. case this.awaitingConns <- &ConnectionWithError{
  65. conn: conn,
  66. err: err,
  67. }:
  68. default:
  69. if conn != nil {
  70. conn.Close()
  71. }
  72. }
  73. this.Unlock()
  74. }
  75. }
  76. func (this *TCPListener) Recycle(dest string, conn net.Conn) {
  77. this.Lock()
  78. defer this.Unlock()
  79. if !this.acccepting {
  80. return
  81. }
  82. select {
  83. case this.awaitingConns <- &ConnectionWithError{conn: conn}:
  84. default:
  85. conn.Close()
  86. }
  87. }
  88. func (this *TCPListener) Addr() net.Addr {
  89. return this.listener.Addr()
  90. }
  91. func (this *TCPListener) Close() error {
  92. this.Lock()
  93. defer this.Unlock()
  94. this.acccepting = false
  95. this.listener.Close()
  96. close(this.awaitingConns)
  97. for connErr := range this.awaitingConns {
  98. if connErr.conn != nil {
  99. go connErr.conn.Close()
  100. }
  101. }
  102. return nil
  103. }
  104. type RawTCPListener struct {
  105. accepting bool
  106. listener *net.TCPListener
  107. }
  108. func (this *RawTCPListener) Accept() (internet.Connection, error) {
  109. conn, err := this.listener.AcceptTCP()
  110. if err != nil {
  111. return nil, err
  112. }
  113. return &RawConnection{
  114. TCPConn: *conn,
  115. }, nil
  116. }
  117. func (this *RawTCPListener) Addr() net.Addr {
  118. return this.listener.Addr()
  119. }
  120. func (this *RawTCPListener) Close() error {
  121. this.accepting = false
  122. this.listener.Close()
  123. return nil
  124. }
  125. func ListenRawTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) {
  126. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  127. IP: address.IP(),
  128. Port: int(port),
  129. })
  130. if err != nil {
  131. return nil, err
  132. }
  133. return &RawTCPListener{
  134. accepting: true,
  135. listener: listener,
  136. }, nil
  137. }
  138. func init() {
  139. internet.TCPListenFunc = ListenTCP
  140. internet.RawTCPListenFunc = ListenRawTCP
  141. }