tcp.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package hub
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "sync"
  7. "github.com/v2ray/v2ray-core/common/log"
  8. v2net "github.com/v2ray/v2ray-core/common/net"
  9. "github.com/v2ray/v2ray-core/proxy"
  10. "github.com/v2ray/v2ray-core/transport"
  11. )
  12. var (
  13. ErrorClosedConnection = errors.New("Connection already closed.")
  14. )
  15. type TCPHub struct {
  16. sync.Mutex
  17. listener net.Listener
  18. connCallback ConnectionHandler
  19. accepting bool
  20. }
  21. func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
  22. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  23. IP: address.IP(),
  24. Port: int(port),
  25. Zone: "",
  26. })
  27. if err != nil {
  28. return nil, err
  29. }
  30. var hub *TCPHub
  31. if tlsConfig != nil {
  32. tlsListener := tls.NewListener(listener, tlsConfig)
  33. hub = &TCPHub{
  34. listener: tlsListener,
  35. connCallback: callback,
  36. }
  37. } else {
  38. hub = &TCPHub{
  39. listener: listener,
  40. connCallback: callback,
  41. }
  42. }
  43. go hub.start()
  44. return hub, nil
  45. }
  46. func ListenKCPhub(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
  47. listener, err := ListenKCP(address, port)
  48. if err != nil {
  49. return nil, err
  50. }
  51. var hub *TCPHub
  52. if tlsConfig != nil {
  53. tlsListener := tls.NewListener(listener, tlsConfig)
  54. hub = &TCPHub{
  55. listener: tlsListener,
  56. connCallback: callback,
  57. }
  58. } else {
  59. hub = &TCPHub{
  60. listener: listener,
  61. connCallback: callback,
  62. }
  63. }
  64. go hub.start()
  65. return hub, nil
  66. }
  67. func ListenTCP6(address v2net.Address, port v2net.Port, callback ConnectionHandler, proxyMeta *proxy.InboundHandlerMeta, tlsConfig *tls.Config) (*TCPHub, error) {
  68. if proxyMeta.KcpSupported && transport.IsKcpEnabled() {
  69. return ListenKCPhub(address, port, callback, tlsConfig)
  70. } else {
  71. return ListenTCP(address, port, callback, tlsConfig)
  72. }
  73. return nil, errors.New("ListenTCP6: Not Implemented")
  74. }
  75. func (this *TCPHub) Close() {
  76. this.accepting = false
  77. this.listener.Close()
  78. }
  79. func (this *TCPHub) start() {
  80. this.accepting = true
  81. for this.accepting {
  82. conn, err := this.listener.Accept()
  83. if err != nil {
  84. if this.accepting {
  85. log.Warning("Listener: Failed to accept new TCP connection: ", err)
  86. }
  87. continue
  88. }
  89. go this.connCallback(&Connection{
  90. dest: conn.RemoteAddr().String(),
  91. conn: conn,
  92. listener: this,
  93. })
  94. }
  95. }
  96. // @Private
  97. func (this *TCPHub) Recycle(dest string, conn net.Conn) {
  98. if this.accepting {
  99. go this.connCallback(&Connection{
  100. dest: dest,
  101. conn: conn,
  102. listener: this,
  103. })
  104. }
  105. }