tcp_hub.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package internet
  2. import (
  3. "net"
  4. "sync"
  5. "v2ray.com/core/common/errors"
  6. v2net "v2ray.com/core/common/net"
  7. "v2ray.com/core/common/retry"
  8. )
  9. var (
  10. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  11. )
  12. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  13. if _, found := transportListenerCache[protocol]; found {
  14. return errors.New("Internet|TCPHub: ", protocol, " listener already registered.")
  15. }
  16. transportListenerCache[protocol] = listener
  17. return nil
  18. }
  19. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  20. type ListenOptions struct {
  21. Stream *StreamConfig
  22. RecvOrigDest bool
  23. }
  24. type Listener interface {
  25. Accept() (Connection, error)
  26. Close() error
  27. Addr() net.Addr
  28. }
  29. type TCPHub struct {
  30. sync.Mutex
  31. listener Listener
  32. connCallback ConnectionHandler
  33. accepting bool
  34. }
  35. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  36. options := ListenOptions{
  37. Stream: settings,
  38. }
  39. protocol := settings.GetEffectiveProtocol()
  40. listenFunc := transportListenerCache[protocol]
  41. if listenFunc == nil {
  42. return nil, errors.New("Internet|TCPHub: ", protocol, " listener not registered.")
  43. }
  44. listener, err := listenFunc(address, port, options)
  45. if err != nil {
  46. return nil, errors.Base(err).Message("Interent|TCPHub: Failed to listen on address: ", address, ":", port)
  47. }
  48. hub := &TCPHub{
  49. listener: listener,
  50. connCallback: callback,
  51. }
  52. go hub.start()
  53. return hub, nil
  54. }
  55. func (v *TCPHub) Close() {
  56. v.accepting = false
  57. v.listener.Close()
  58. }
  59. func (v *TCPHub) start() {
  60. v.accepting = true
  61. for v.accepting {
  62. var newConn Connection
  63. err := retry.ExponentialBackoff(10, 500).On(func() error {
  64. if !v.accepting {
  65. return nil
  66. }
  67. conn, err := v.listener.Accept()
  68. if err != nil {
  69. return errors.Base(err).Message("Internet|Listener: Failed to accept new TCP connection.")
  70. }
  71. newConn = conn
  72. return nil
  73. })
  74. if err == nil && newConn != nil {
  75. go v.connCallback(newConn)
  76. }
  77. }
  78. }