hub.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package tcp
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "sync"
  7. "time"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/transport/internet"
  10. )
  11. var (
  12. ErrClosedListener = errors.New("Listener is closed.")
  13. )
  14. type ConnectionWithError struct {
  15. conn net.Conn
  16. err error
  17. }
  18. type TCPListener struct {
  19. sync.Mutex
  20. acccepting bool
  21. listener *net.TCPListener
  22. awaitingConns chan *ConnectionWithError
  23. tlsConfig *tls.Config
  24. }
  25. func ListenTCP(address v2net.Address, port v2net.Port, options internet.ListenOptions) (internet.Listener, error) {
  26. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  27. IP: address.IP(),
  28. Port: int(port),
  29. })
  30. if err != nil {
  31. return nil, err
  32. }
  33. l := &TCPListener{
  34. acccepting: true,
  35. listener: listener,
  36. awaitingConns: make(chan *ConnectionWithError, 32),
  37. }
  38. if options.Stream != nil && options.Stream.Security == internet.StreamSecurityTypeTLS {
  39. l.tlsConfig = options.Stream.TLSSettings.GetTLSConfig()
  40. }
  41. go l.KeepAccepting()
  42. return l, nil
  43. }
  44. func (this *TCPListener) Accept() (internet.Connection, error) {
  45. for this.acccepting {
  46. select {
  47. case connErr, open := <-this.awaitingConns:
  48. if !open {
  49. return nil, ErrClosedListener
  50. }
  51. if connErr.err != nil {
  52. return nil, connErr.err
  53. }
  54. conn := connErr.conn
  55. if this.tlsConfig != nil {
  56. conn = tls.Server(conn, this.tlsConfig)
  57. }
  58. return NewConnection("", conn, this), nil
  59. case <-time.After(time.Second * 2):
  60. }
  61. }
  62. return nil, ErrClosedListener
  63. }
  64. func (this *TCPListener) KeepAccepting() {
  65. for this.acccepting {
  66. conn, err := this.listener.Accept()
  67. this.Lock()
  68. if !this.acccepting {
  69. this.Unlock()
  70. break
  71. }
  72. select {
  73. case this.awaitingConns <- &ConnectionWithError{
  74. conn: conn,
  75. err: err,
  76. }:
  77. default:
  78. if conn != nil {
  79. conn.Close()
  80. }
  81. }
  82. this.Unlock()
  83. }
  84. }
  85. func (this *TCPListener) Recycle(dest string, conn net.Conn) {
  86. this.Lock()
  87. defer this.Unlock()
  88. if !this.acccepting {
  89. return
  90. }
  91. select {
  92. case this.awaitingConns <- &ConnectionWithError{conn: conn}:
  93. default:
  94. conn.Close()
  95. }
  96. }
  97. func (this *TCPListener) Addr() net.Addr {
  98. return this.listener.Addr()
  99. }
  100. func (this *TCPListener) Close() error {
  101. this.Lock()
  102. defer this.Unlock()
  103. this.acccepting = false
  104. this.listener.Close()
  105. close(this.awaitingConns)
  106. for connErr := range this.awaitingConns {
  107. if connErr.conn != nil {
  108. go connErr.conn.Close()
  109. }
  110. }
  111. return nil
  112. }
  113. type RawTCPListener struct {
  114. accepting bool
  115. listener *net.TCPListener
  116. }
  117. func (this *RawTCPListener) Accept() (internet.Connection, error) {
  118. conn, err := this.listener.AcceptTCP()
  119. if err != nil {
  120. return nil, err
  121. }
  122. return &RawConnection{
  123. TCPConn: *conn,
  124. }, nil
  125. }
  126. func (this *RawTCPListener) Addr() net.Addr {
  127. return this.listener.Addr()
  128. }
  129. func (this *RawTCPListener) Close() error {
  130. this.accepting = false
  131. this.listener.Close()
  132. return nil
  133. }
  134. func ListenRawTCP(address v2net.Address, port v2net.Port, options internet.ListenOptions) (internet.Listener, error) {
  135. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  136. IP: address.IP(),
  137. Port: int(port),
  138. })
  139. if err != nil {
  140. return nil, err
  141. }
  142. // TODO: handle listen options
  143. return &RawTCPListener{
  144. accepting: true,
  145. listener: listener,
  146. }, nil
  147. }
  148. func init() {
  149. internet.TCPListenFunc = ListenTCP
  150. internet.RawTCPListenFunc = ListenRawTCP
  151. }