tcp_hub.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package internet
  2. import (
  3. "net"
  4. "context"
  5. "v2ray.com/core/app/log"
  6. "v2ray.com/core/common/errors"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/common/retry"
  9. )
  10. var (
  11. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  12. )
  13. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  14. if _, found := transportListenerCache[protocol]; found {
  15. return errors.New("Internet|TCPHub: ", protocol, " listener already registered.")
  16. }
  17. transportListenerCache[protocol] = listener
  18. return nil
  19. }
  20. type ListenFunc func(ctx context.Context, address v2net.Address, port v2net.Port) (Listener, error)
  21. type Listener interface {
  22. Accept() (Connection, error)
  23. Close() error
  24. Addr() net.Addr
  25. }
  26. type TCPHub struct {
  27. listener Listener
  28. connCallback ConnectionHandler
  29. closed chan bool
  30. }
  31. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  32. ctx := context.Background()
  33. protocol := settings.GetEffectiveProtocol()
  34. transportSettings, err := settings.GetEffectiveTransportSettings()
  35. if err != nil {
  36. return nil, err
  37. }
  38. ctx = ContextWithTransportSettings(ctx, transportSettings)
  39. if settings != nil && settings.HasSecuritySettings() {
  40. securitySettings, err := settings.GetEffectiveSecuritySettings()
  41. if err != nil {
  42. return nil, err
  43. }
  44. ctx = ContextWithSecuritySettings(ctx, securitySettings)
  45. }
  46. listenFunc := transportListenerCache[protocol]
  47. if listenFunc == nil {
  48. return nil, errors.New("Internet|TCPHub: ", protocol, " listener not registered.")
  49. }
  50. listener, err := listenFunc(ctx, address, port)
  51. if err != nil {
  52. return nil, errors.Base(err).Message("Internet|TCPHub: Failed to listen on address: ", address, ":", port)
  53. }
  54. hub := &TCPHub{
  55. listener: listener,
  56. connCallback: callback,
  57. }
  58. go hub.start()
  59. return hub, nil
  60. }
  61. func (v *TCPHub) Close() {
  62. defer func() {
  63. recover()
  64. }()
  65. select {
  66. case <-v.closed:
  67. return
  68. default:
  69. v.listener.Close()
  70. close(v.closed)
  71. }
  72. }
  73. func (v *TCPHub) start() {
  74. for {
  75. select {
  76. case <-v.closed:
  77. return
  78. default:
  79. }
  80. var newConn Connection
  81. err := retry.ExponentialBackoff(10, 500).On(func() error {
  82. select {
  83. case <-v.closed:
  84. return nil
  85. default:
  86. conn, err := v.listener.Accept()
  87. if err != nil {
  88. return errors.Base(err).RequireUserAction().Message("Internet|Listener: Failed to accept new TCP connection.")
  89. }
  90. newConn = conn
  91. return nil
  92. }
  93. })
  94. if err != nil {
  95. if errors.IsActionRequired(err) {
  96. log.Warning(err)
  97. } else {
  98. log.Info(err)
  99. }
  100. continue
  101. }
  102. if newConn != nil {
  103. go v.connCallback(newConn)
  104. }
  105. }
  106. }