hub.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package tcp
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/errors"
  9. "v2ray.com/core/common/log"
  10. v2net "v2ray.com/core/common/net"
  11. "v2ray.com/core/transport/internet"
  12. "v2ray.com/core/transport/internet/internal"
  13. v2tls "v2ray.com/core/transport/internet/tls"
  14. )
  15. var (
  16. ErrClosedListener = errors.New("Listener is closed.")
  17. )
  18. type ConnectionWithError struct {
  19. conn net.Conn
  20. err error
  21. }
  22. type TCPListener struct {
  23. sync.Mutex
  24. acccepting bool
  25. listener *net.TCPListener
  26. awaitingConns chan *ConnectionWithError
  27. tlsConfig *tls.Config
  28. authConfig internet.ConnectionAuthenticator
  29. config *Config
  30. }
  31. func ListenTCP(address v2net.Address, port v2net.Port, options internet.ListenOptions) (internet.Listener, error) {
  32. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  33. IP: address.IP(),
  34. Port: int(port),
  35. })
  36. if err != nil {
  37. return nil, err
  38. }
  39. log.Info("TCP|Listener: Listening on ", address, ":", port)
  40. networkSettings, err := options.Stream.GetEffectiveTransportSettings()
  41. if err != nil {
  42. return nil, err
  43. }
  44. tcpSettings := networkSettings.(*Config)
  45. l := &TCPListener{
  46. acccepting: true,
  47. listener: listener,
  48. awaitingConns: make(chan *ConnectionWithError, 32),
  49. config: tcpSettings,
  50. }
  51. if options.Stream != nil && options.Stream.HasSecuritySettings() {
  52. securitySettings, err := options.Stream.GetEffectiveSecuritySettings()
  53. if err != nil {
  54. log.Error("TCP: Failed to get security config: ", err)
  55. return nil, err
  56. }
  57. tlsConfig, ok := securitySettings.(*v2tls.Config)
  58. if ok {
  59. l.tlsConfig = tlsConfig.GetTLSConfig()
  60. }
  61. }
  62. if tcpSettings.HeaderSettings != nil {
  63. headerConfig, err := tcpSettings.HeaderSettings.GetInstance()
  64. if err != nil {
  65. return nil, errors.Base(err).Message("Internet|TCP: Invalid header settings.")
  66. }
  67. auth, err := internet.CreateConnectionAuthenticator(headerConfig)
  68. if err != nil {
  69. return nil, errors.Base(err).Message("Internet|TCP: Invalid header settings.")
  70. }
  71. l.authConfig = auth
  72. }
  73. go l.KeepAccepting()
  74. return l, nil
  75. }
  76. func (v *TCPListener) Accept() (internet.Connection, error) {
  77. for v.acccepting {
  78. select {
  79. case connErr, open := <-v.awaitingConns:
  80. if !open {
  81. return nil, ErrClosedListener
  82. }
  83. if connErr.err != nil {
  84. return nil, connErr.err
  85. }
  86. conn := connErr.conn
  87. return internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())), nil
  88. case <-time.After(time.Second * 2):
  89. }
  90. }
  91. return nil, ErrClosedListener
  92. }
  93. func (v *TCPListener) KeepAccepting() {
  94. for v.acccepting {
  95. conn, err := v.listener.Accept()
  96. v.Lock()
  97. if !v.acccepting {
  98. v.Unlock()
  99. break
  100. }
  101. if v.tlsConfig != nil {
  102. conn = tls.Server(conn, v.tlsConfig)
  103. }
  104. if v.authConfig != nil {
  105. conn = v.authConfig.Server(conn)
  106. }
  107. select {
  108. case v.awaitingConns <- &ConnectionWithError{
  109. conn: conn,
  110. err: err,
  111. }:
  112. default:
  113. if conn != nil {
  114. conn.Close()
  115. }
  116. }
  117. v.Unlock()
  118. }
  119. }
  120. func (v *TCPListener) Put(id internal.ConnectionID, conn net.Conn) {
  121. v.Lock()
  122. defer v.Unlock()
  123. if !v.acccepting {
  124. return
  125. }
  126. select {
  127. case v.awaitingConns <- &ConnectionWithError{conn: conn}:
  128. default:
  129. conn.Close()
  130. }
  131. }
  132. func (v *TCPListener) Addr() net.Addr {
  133. return v.listener.Addr()
  134. }
  135. func (v *TCPListener) Close() error {
  136. v.Lock()
  137. defer v.Unlock()
  138. v.acccepting = false
  139. v.listener.Close()
  140. close(v.awaitingConns)
  141. for connErr := range v.awaitingConns {
  142. if connErr.conn != nil {
  143. connErr.conn.Close()
  144. }
  145. }
  146. return nil
  147. }
  148. func init() {
  149. common.Must(internet.RegisterTransportListener(internet.TransportProtocol_TCP, ListenTCP))
  150. }