tcp_hub.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package internet
  2. import (
  3. "net"
  4. "sync"
  5. "v2ray.com/core/common/errors"
  6. "v2ray.com/core/common/log"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/common/retry"
  9. )
  10. var (
  11. ErrClosedConnection = errors.New("Connection already closed.")
  12. networkListenerCache = make(map[v2net.Network]ListenFunc)
  13. )
  14. func RegisterNetworkListener(network v2net.Network, listener ListenFunc) error {
  15. if _, found := networkListenerCache[network]; found {
  16. return errors.New("Internet|TCPHub: ", network, " listener already registered.")
  17. }
  18. networkListenerCache[network] = listener
  19. return nil
  20. }
  21. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  22. type ListenOptions struct {
  23. Stream *StreamConfig
  24. }
  25. type Listener interface {
  26. Accept() (Connection, error)
  27. Close() error
  28. Addr() net.Addr
  29. }
  30. type TCPHub struct {
  31. sync.Mutex
  32. listener Listener
  33. connCallback ConnectionHandler
  34. accepting bool
  35. }
  36. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  37. options := ListenOptions{
  38. Stream: settings,
  39. }
  40. listenFunc := networkListenerCache[settings.Network]
  41. if listenFunc == nil {
  42. return nil, errors.New("Internet|TCPHub: ", settings.Network, " listener not registered.")
  43. }
  44. listener, err := listenFunc(address, port, options)
  45. if err != nil {
  46. return nil, errors.Base(err).Message("Interent|TCPHub: Failed to listen: ")
  47. }
  48. hub := &TCPHub{
  49. listener: listener,
  50. connCallback: callback,
  51. }
  52. go hub.start()
  53. return hub, nil
  54. }
  55. func (v *TCPHub) Close() {
  56. v.accepting = false
  57. v.listener.Close()
  58. }
  59. func (v *TCPHub) start() {
  60. v.accepting = true
  61. for v.accepting {
  62. var newConn Connection
  63. err := retry.ExponentialBackoff(10, 200).On(func() error {
  64. if !v.accepting {
  65. return nil
  66. }
  67. conn, err := v.listener.Accept()
  68. if err != nil {
  69. if v.accepting {
  70. log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
  71. }
  72. return err
  73. }
  74. newConn = conn
  75. return nil
  76. })
  77. if err == nil && newConn != nil {
  78. go v.connCallback(newConn)
  79. }
  80. }
  81. }