hub.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package tcp
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/errors"
  9. "v2ray.com/core/common/log"
  10. v2net "v2ray.com/core/common/net"
  11. "v2ray.com/core/transport/internet"
  12. "v2ray.com/core/transport/internet/internal"
  13. v2tls "v2ray.com/core/transport/internet/tls"
  14. )
  15. var (
  16. ErrClosedListener = errors.New("Listener is closed.")
  17. )
  18. type ConnectionWithError struct {
  19. conn net.Conn
  20. err error
  21. }
  22. type TCPListener struct {
  23. sync.Mutex
  24. acccepting bool
  25. listener *net.TCPListener
  26. awaitingConns chan *ConnectionWithError
  27. tlsConfig *tls.Config
  28. authConfig internet.ConnectionAuthenticator
  29. config *Config
  30. }
  31. func ListenTCP(address v2net.Address, port v2net.Port, options internet.ListenOptions) (internet.Listener, error) {
  32. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  33. IP: address.IP(),
  34. Port: int(port),
  35. })
  36. if err != nil {
  37. return nil, err
  38. }
  39. networkSettings, err := options.Stream.GetEffectiveTransportSettings()
  40. if err != nil {
  41. return nil, err
  42. }
  43. tcpSettings := networkSettings.(*Config)
  44. l := &TCPListener{
  45. acccepting: true,
  46. listener: listener,
  47. awaitingConns: make(chan *ConnectionWithError, 32),
  48. config: tcpSettings,
  49. }
  50. if options.Stream != nil && options.Stream.HasSecuritySettings() {
  51. securitySettings, err := options.Stream.GetEffectiveSecuritySettings()
  52. if err != nil {
  53. log.Error("TCP: Failed to get security config: ", err)
  54. return nil, err
  55. }
  56. tlsConfig, ok := securitySettings.(*v2tls.Config)
  57. if ok {
  58. l.tlsConfig = tlsConfig.GetTLSConfig()
  59. }
  60. }
  61. if tcpSettings.HeaderSettings != nil {
  62. headerConfig, err := tcpSettings.HeaderSettings.GetInstance()
  63. if err != nil {
  64. return nil, errors.Base(err).Message("Internet|TCP: Invalid header settings.")
  65. }
  66. auth, err := internet.CreateConnectionAuthenticator(tcpSettings.HeaderSettings.Type, headerConfig)
  67. if err != nil {
  68. return nil, errors.Base(err).Message("Internet|TCP: Invalid header settings.")
  69. }
  70. l.authConfig = auth
  71. }
  72. go l.KeepAccepting()
  73. return l, nil
  74. }
  75. func (v *TCPListener) Accept() (internet.Connection, error) {
  76. for v.acccepting {
  77. select {
  78. case connErr, open := <-v.awaitingConns:
  79. if !open {
  80. return nil, ErrClosedListener
  81. }
  82. if connErr.err != nil {
  83. return nil, connErr.err
  84. }
  85. conn := connErr.conn
  86. return internal.NewConnection(internal.ConnectionID{}, conn, v, internal.ReuseConnection(v.config.IsConnectionReuse())), nil
  87. case <-time.After(time.Second * 2):
  88. }
  89. }
  90. return nil, ErrClosedListener
  91. }
  92. func (v *TCPListener) KeepAccepting() {
  93. for v.acccepting {
  94. conn, err := v.listener.Accept()
  95. v.Lock()
  96. if !v.acccepting {
  97. v.Unlock()
  98. break
  99. }
  100. if v.tlsConfig != nil {
  101. conn = tls.Server(conn, v.tlsConfig)
  102. }
  103. if v.authConfig != nil {
  104. conn = v.authConfig.Server(conn)
  105. }
  106. select {
  107. case v.awaitingConns <- &ConnectionWithError{
  108. conn: conn,
  109. err: err,
  110. }:
  111. default:
  112. if conn != nil {
  113. conn.Close()
  114. }
  115. }
  116. v.Unlock()
  117. }
  118. }
  119. func (v *TCPListener) Put(id internal.ConnectionID, conn net.Conn) {
  120. v.Lock()
  121. defer v.Unlock()
  122. if !v.acccepting {
  123. return
  124. }
  125. select {
  126. case v.awaitingConns <- &ConnectionWithError{conn: conn}:
  127. default:
  128. conn.Close()
  129. }
  130. }
  131. func (v *TCPListener) Addr() net.Addr {
  132. return v.listener.Addr()
  133. }
  134. func (v *TCPListener) Close() error {
  135. v.Lock()
  136. defer v.Unlock()
  137. v.acccepting = false
  138. v.listener.Close()
  139. close(v.awaitingConns)
  140. for connErr := range v.awaitingConns {
  141. if connErr.conn != nil {
  142. go connErr.conn.Close()
  143. }
  144. }
  145. return nil
  146. }
  147. func init() {
  148. common.Must(internet.RegisterTransportListener(internet.TransportProtocol_TCP, ListenTCP))
  149. }