tcp_hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package internet
  2. import (
  3. "net"
  4. "v2ray.com/core/app/log"
  5. "v2ray.com/core/common/errors"
  6. v2net "v2ray.com/core/common/net"
  7. "v2ray.com/core/common/retry"
  8. )
  9. var (
  10. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  11. )
  12. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  13. if _, found := transportListenerCache[protocol]; found {
  14. return errors.New("Internet|TCPHub: ", protocol, " listener already registered.")
  15. }
  16. transportListenerCache[protocol] = listener
  17. return nil
  18. }
  19. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  20. type ListenOptions struct {
  21. Stream *StreamConfig
  22. RecvOrigDest bool
  23. }
  24. type Listener interface {
  25. Accept() (Connection, error)
  26. Close() error
  27. Addr() net.Addr
  28. }
  29. type TCPHub struct {
  30. listener Listener
  31. connCallback ConnectionHandler
  32. closed chan bool
  33. }
  34. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  35. options := ListenOptions{
  36. Stream: settings,
  37. }
  38. protocol := settings.GetEffectiveProtocol()
  39. listenFunc := transportListenerCache[protocol]
  40. if listenFunc == nil {
  41. return nil, errors.New("Internet|TCPHub: ", protocol, " listener not registered.")
  42. }
  43. listener, err := listenFunc(address, port, options)
  44. if err != nil {
  45. return nil, errors.Base(err).Message("Internet|TCPHub: Failed to listen on address: ", address, ":", port)
  46. }
  47. hub := &TCPHub{
  48. listener: listener,
  49. connCallback: callback,
  50. }
  51. go hub.start()
  52. return hub, nil
  53. }
  54. func (v *TCPHub) Close() {
  55. defer func() {
  56. recover()
  57. }()
  58. select {
  59. case <-v.closed:
  60. return
  61. default:
  62. v.listener.Close()
  63. close(v.closed)
  64. }
  65. }
  66. func (v *TCPHub) start() {
  67. for {
  68. select {
  69. case <-v.closed:
  70. return
  71. default:
  72. }
  73. var newConn Connection
  74. err := retry.ExponentialBackoff(10, 500).On(func() error {
  75. select {
  76. case <-v.closed:
  77. return nil
  78. default:
  79. conn, err := v.listener.Accept()
  80. if err != nil {
  81. return errors.Base(err).RequireUserAction().Message("Internet|Listener: Failed to accept new TCP connection.")
  82. }
  83. newConn = conn
  84. return nil
  85. }
  86. })
  87. if err != nil {
  88. if errors.IsActionRequired(err) {
  89. log.Warning(err)
  90. } else {
  91. log.Info(err)
  92. }
  93. continue
  94. }
  95. if newConn != nil {
  96. go v.connCallback(newConn)
  97. }
  98. }
  99. }