tcp_hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package internet
  2. import (
  3. "net"
  4. "v2ray.com/core/app/log"
  5. "v2ray.com/core/common/errors"
  6. v2net "v2ray.com/core/common/net"
  7. "v2ray.com/core/common/retry"
  8. )
  9. var (
  10. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  11. )
  12. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  13. if _, found := transportListenerCache[protocol]; found {
  14. return errors.New("Internet|TCPHub: ", protocol, " listener already registered.")
  15. }
  16. transportListenerCache[protocol] = listener
  17. return nil
  18. }
  19. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  20. type ListenOptions struct {
  21. Stream *StreamConfig
  22. RecvOrigDest bool
  23. }
  24. type Listener interface {
  25. Accept() (Connection, error)
  26. Close() error
  27. Addr() net.Addr
  28. }
  29. type TCPHub struct {
  30. listener Listener
  31. connCallback ConnectionHandler
  32. closed chan bool
  33. }
  34. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  35. options := ListenOptions{
  36. Stream: settings,
  37. }
  38. protocol := settings.GetEffectiveProtocol()
  39. listenFunc := transportListenerCache[protocol]
  40. if listenFunc == nil {
  41. return nil, errors.New("Internet|TCPHub: ", protocol, " listener not registered.")
  42. }
  43. listener, err := listenFunc(address, port, options)
  44. if err != nil {
  45. return nil, errors.Base(err).Message("Internet|TCPHub: Failed to listen on address: ", address, ":", port)
  46. }
  47. hub := &TCPHub{
  48. listener: listener,
  49. connCallback: callback,
  50. }
  51. go hub.start()
  52. return hub, nil
  53. }
  54. func (v *TCPHub) Close() {
  55. select {
  56. case <-v.closed:
  57. return
  58. default:
  59. v.listener.Close()
  60. }
  61. }
  62. func (v *TCPHub) start() {
  63. for {
  64. select {
  65. case <-v.closed:
  66. return
  67. default:
  68. }
  69. var newConn Connection
  70. err := retry.ExponentialBackoff(10, 500).On(func() error {
  71. select {
  72. case <-v.closed:
  73. return nil
  74. default:
  75. conn, err := v.listener.Accept()
  76. if err != nil {
  77. return errors.Base(err).RequireUserAction().Message("Internet|Listener: Failed to accept new TCP connection.")
  78. }
  79. newConn = conn
  80. return nil
  81. }
  82. })
  83. if err != nil {
  84. if errors.IsActionRequired(err) {
  85. log.Warning(err)
  86. } else {
  87. log.Info(err)
  88. }
  89. continue
  90. }
  91. if newConn != nil {
  92. go v.connCallback(newConn)
  93. }
  94. }
  95. }