hub.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package ws
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "net/http"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/gorilla/websocket"
  11. "v2ray.com/core/common/log"
  12. v2net "v2ray.com/core/common/net"
  13. "v2ray.com/core/transport/internet"
  14. )
  15. var (
  16. ErrClosedListener = errors.New("Listener is closed.")
  17. )
  18. type ConnectionWithError struct {
  19. conn net.Conn
  20. err error
  21. }
  22. type WSListener struct {
  23. sync.Mutex
  24. acccepting bool
  25. awaitingConns chan *ConnectionWithError
  26. listener *StoppableListener
  27. tlsConfig *tls.Config
  28. }
  29. func ListenWS(address v2net.Address, port v2net.Port, options internet.ListenOptions) (internet.Listener, error) {
  30. l := &WSListener{
  31. acccepting: true,
  32. awaitingConns: make(chan *ConnectionWithError, 32),
  33. }
  34. if options.Stream != nil && options.Stream.Security == internet.StreamSecurityTypeTLS {
  35. l.tlsConfig = options.Stream.TLSSettings.GetTLSConfig()
  36. }
  37. err := l.listenws(address, port)
  38. return l, err
  39. }
  40. func (wsl *WSListener) listenws(address v2net.Address, port v2net.Port) error {
  41. http.HandleFunc("/"+effectiveConfig.Path, func(w http.ResponseWriter, r *http.Request) {
  42. con, err := wsl.converttovws(w, r)
  43. if err != nil {
  44. log.Warning("WebSocket|Listener: Failed to convert connection: ", err)
  45. return
  46. }
  47. select {
  48. case wsl.awaitingConns <- &ConnectionWithError{
  49. conn: con,
  50. }:
  51. default:
  52. if con != nil {
  53. con.Close()
  54. }
  55. }
  56. return
  57. })
  58. errchan := make(chan error)
  59. listenerfunc := func() error {
  60. ol, err := net.Listen("tcp", address.String()+":"+strconv.Itoa(int(port.Value())))
  61. if err != nil {
  62. return err
  63. }
  64. wsl.listener, err = NewStoppableListener(ol)
  65. if err != nil {
  66. return err
  67. }
  68. return http.Serve(wsl.listener, nil)
  69. }
  70. if wsl.tlsConfig != nil {
  71. listenerfunc = func() error {
  72. var err error
  73. wsl.listener, err = getstopableTLSlistener(wsl.tlsConfig, address.String()+":"+strconv.Itoa(int(port.Value())))
  74. if err != nil {
  75. return err
  76. }
  77. return http.Serve(wsl.listener, nil)
  78. }
  79. }
  80. go func() {
  81. err := listenerfunc()
  82. errchan <- err
  83. return
  84. }()
  85. var err error
  86. select {
  87. case err = <-errchan:
  88. case <-time.After(time.Second * 2):
  89. //Should this listen fail after 2 sec, it could gone untracked.
  90. }
  91. if err != nil {
  92. log.Error("WebSocket|Listener: Failed to serve: ", err)
  93. }
  94. return err
  95. }
  96. func (wsl *WSListener) converttovws(w http.ResponseWriter, r *http.Request) (*wsconn, error) {
  97. var upgrader = websocket.Upgrader{
  98. ReadBufferSize: 65536,
  99. WriteBufferSize: 65536,
  100. }
  101. conn, err := upgrader.Upgrade(w, r, nil)
  102. if err != nil {
  103. return nil, err
  104. }
  105. wrapedConn := &wsconn{wsc: conn, connClosing: false}
  106. wrapedConn.setup()
  107. return wrapedConn, nil
  108. }
  109. func (this *WSListener) Accept() (internet.Connection, error) {
  110. for this.acccepting {
  111. select {
  112. case connErr, open := <-this.awaitingConns:
  113. if !open {
  114. return nil, ErrClosedListener
  115. }
  116. if connErr.err != nil {
  117. return nil, connErr.err
  118. }
  119. return NewConnection("", connErr.conn.(*wsconn), this), nil
  120. case <-time.After(time.Second * 2):
  121. }
  122. }
  123. return nil, ErrClosedListener
  124. }
  125. func (this *WSListener) Recycle(dest string, conn *wsconn) {
  126. this.Lock()
  127. defer this.Unlock()
  128. if !this.acccepting {
  129. return
  130. }
  131. select {
  132. case this.awaitingConns <- &ConnectionWithError{conn: conn}:
  133. default:
  134. conn.Close()
  135. }
  136. }
  137. func (this *WSListener) Addr() net.Addr {
  138. return nil
  139. }
  140. func (this *WSListener) Close() error {
  141. this.Lock()
  142. defer this.Unlock()
  143. this.acccepting = false
  144. this.listener.Stop()
  145. close(this.awaitingConns)
  146. for connErr := range this.awaitingConns {
  147. if connErr.conn != nil {
  148. go connErr.conn.Close()
  149. }
  150. }
  151. return nil
  152. }
  153. func init() {
  154. internet.WSListenFunc = ListenWS
  155. }