tcp_hub.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package internet
  2. import (
  3. "errors"
  4. "net"
  5. "sync"
  6. "v2ray.com/core/common/log"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/common/retry"
  9. )
  10. var (
  11. ErrClosedConnection = errors.New("Connection already closed.")
  12. KCPListenFunc ListenFunc
  13. TCPListenFunc ListenFunc
  14. RawTCPListenFunc ListenFunc
  15. WSListenFunc ListenFunc
  16. )
  17. type ListenFunc func(address v2net.Address, port v2net.Port, options ListenOptions) (Listener, error)
  18. type ListenOptions struct {
  19. Stream *StreamConfig
  20. }
  21. type Listener interface {
  22. Accept() (Connection, error)
  23. Close() error
  24. Addr() net.Addr
  25. }
  26. type TCPHub struct {
  27. sync.Mutex
  28. listener Listener
  29. connCallback ConnectionHandler
  30. accepting bool
  31. }
  32. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamConfig) (*TCPHub, error) {
  33. var listener Listener
  34. var err error
  35. options := ListenOptions{
  36. Stream: settings,
  37. }
  38. switch settings.Network {
  39. case v2net.Network_TCP:
  40. listener, err = TCPListenFunc(address, port, options)
  41. case v2net.Network_KCP:
  42. listener, err = KCPListenFunc(address, port, options)
  43. case v2net.Network_WebSocket:
  44. listener, err = WSListenFunc(address, port, options)
  45. case v2net.Network_RawTCP:
  46. listener, err = RawTCPListenFunc(address, port, options)
  47. default:
  48. log.Error("Internet|Listener: Unknown stream type: ", settings.Network)
  49. err = ErrUnsupportedStreamType
  50. }
  51. if err != nil {
  52. log.Warning("Internet|Listener: Failed to listen on ", address, ":", port)
  53. return nil, err
  54. }
  55. hub := &TCPHub{
  56. listener: listener,
  57. connCallback: callback,
  58. }
  59. go hub.start()
  60. return hub, nil
  61. }
  62. func (this *TCPHub) Close() {
  63. this.accepting = false
  64. this.listener.Close()
  65. }
  66. func (this *TCPHub) start() {
  67. this.accepting = true
  68. for this.accepting {
  69. var newConn Connection
  70. err := retry.ExponentialBackoff(10, 200).On(func() error {
  71. if !this.accepting {
  72. return nil
  73. }
  74. conn, err := this.listener.Accept()
  75. if err != nil {
  76. if this.accepting {
  77. log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
  78. }
  79. return err
  80. }
  81. newConn = conn
  82. return nil
  83. })
  84. if err == nil && newConn != nil {
  85. go this.connCallback(newConn)
  86. }
  87. }
  88. }