tcp_hub.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package internet
  2. import (
  3. "net"
  4. "sync"
  5. "v2ray.com/core/common/errors"
  6. "v2ray.com/core/common/log"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/common/retry"
  9. )
  10. var (
  11. ErrClosedConnection = errors.New("Connection already closed.")
  12. KCPListenFunc ListenFunc
  13. TCPListenFunc ListenFunc
  14. WSListenFunc ListenFunc
  15. )
  16. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  17. type ListenOptions struct {
  18. Stream *StreamConfig
  19. }
  20. type Listener interface {
  21. Accept() (Connection, error)
  22. Close() error
  23. Addr() net.Addr
  24. }
  25. type TCPHub struct {
  26. sync.Mutex
  27. listener Listener
  28. connCallback ConnectionHandler
  29. accepting bool
  30. }
  31. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  32. var listener Listener
  33. var err error
  34. options := ListenOptions{
  35. Stream: settings,
  36. }
  37. switch settings.Network {
  38. case v2net.Network_TCP:
  39. listener, err = TCPListenFunc(address, port, options)
  40. case v2net.Network_KCP:
  41. listener, err = KCPListenFunc(address, port, options)
  42. case v2net.Network_WebSocket:
  43. listener, err = WSListenFunc(address, port, options)
  44. default:
  45. log.Error("Internet|Listener: Unknown stream type: ", settings.Network)
  46. err = ErrUnsupportedStreamType
  47. }
  48. if err != nil {
  49. log.Warning("Internet|Listener: Failed to listen on ", address, ":", port)
  50. return nil, err
  51. }
  52. hub := &TCPHub{
  53. listener: listener,
  54. connCallback: callback,
  55. }
  56. go hub.start()
  57. return hub, nil
  58. }
  59. func (v *TCPHub) Close() {
  60. v.accepting = false
  61. v.listener.Close()
  62. }
  63. func (v *TCPHub) start() {
  64. v.accepting = true
  65. for v.accepting {
  66. var newConn Connection
  67. err := retry.ExponentialBackoff(10, 200).On(func() error {
  68. if !v.accepting {
  69. return nil
  70. }
  71. conn, err := v.listener.Accept()
  72. if err != nil {
  73. if v.accepting {
  74. log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
  75. }
  76. return err
  77. }
  78. newConn = conn
  79. return nil
  80. })
  81. if err == nil && newConn != nil {
  82. go v.connCallback(newConn)
  83. }
  84. }
  85. }