tcp_hub.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package internet
  2. import (
  3. "net"
  4. "sync"
  5. "v2ray.com/core/common/errors"
  6. "v2ray.com/core/common/log"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/common/retry"
  9. )
  10. var (
  11. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  12. )
  13. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  14. if _, found := transportListenerCache[protocol]; found {
  15. return errors.New("Internet|TCPHub: ", protocol, " listener already registered.")
  16. }
  17. transportListenerCache[protocol] = listener
  18. return nil
  19. }
  20. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  21. type ListenOptions struct {
  22. Stream *StreamConfig
  23. RecvOrigDest bool
  24. }
  25. type Listener interface {
  26. Accept() (Connection, error)
  27. Close() error
  28. Addr() net.Addr
  29. }
  30. type TCPHub struct {
  31. sync.Mutex
  32. listener Listener
  33. connCallback ConnectionHandler
  34. accepting bool
  35. }
  36. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  37. options := ListenOptions{
  38. Stream: settings,
  39. }
  40. protocol := settings.GetEffectiveProtocol()
  41. listenFunc := transportListenerCache[protocol]
  42. if listenFunc == nil {
  43. return nil, errors.New("Internet|TCPHub: ", protocol, " listener not registered.")
  44. }
  45. listener, err := listenFunc(address, port, options)
  46. if err != nil {
  47. return nil, errors.Base(err).Message("Interent|TCPHub: Failed to listen on address: ", address, ":", port)
  48. }
  49. hub := &TCPHub{
  50. listener: listener,
  51. connCallback: callback,
  52. }
  53. go hub.start()
  54. return hub, nil
  55. }
  56. func (v *TCPHub) Close() {
  57. v.accepting = false
  58. v.listener.Close()
  59. }
  60. func (v *TCPHub) start() {
  61. v.accepting = true
  62. for v.accepting {
  63. var newConn Connection
  64. err := retry.ExponentialBackoff(10, 200).On(func() error {
  65. if !v.accepting {
  66. return nil
  67. }
  68. conn, err := v.listener.Accept()
  69. if err != nil {
  70. if v.accepting {
  71. log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
  72. }
  73. return err
  74. }
  75. newConn = conn
  76. return nil
  77. })
  78. if err == nil && newConn != nil {
  79. go v.connCallback(newConn)
  80. }
  81. }
  82. }