wsconn.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package ws
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. "v2ray.com/core/common/log"
  10. )
  11. type wsconn struct {
  12. wsc *websocket.Conn
  13. readBuffer *bufio.Reader
  14. connClosing bool
  15. reusable bool
  16. rlock *sync.Mutex
  17. wlock *sync.Mutex
  18. config *Config
  19. }
  20. func (ws *wsconn) Read(b []byte) (n int, err error) {
  21. ws.rlock.Lock()
  22. n, err = ws.read(b)
  23. ws.rlock.Unlock()
  24. return n, err
  25. }
  26. func (ws *wsconn) read(b []byte) (n int, err error) {
  27. if ws.connClosing {
  28. return 0, io.EOF
  29. }
  30. n, err = ws.readNext(b)
  31. return n, err
  32. }
  33. func (ws *wsconn) getNewReadBuffer() error {
  34. _, r, err := ws.wsc.NextReader()
  35. if err != nil {
  36. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  37. ws.connClosing = true
  38. ws.Close()
  39. return err
  40. }
  41. ws.readBuffer = bufio.NewReader(r)
  42. return nil
  43. }
  44. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  45. if ws.readBuffer == nil {
  46. err = ws.getNewReadBuffer()
  47. if err != nil {
  48. return 0, err
  49. }
  50. }
  51. n, err = ws.readBuffer.Read(b)
  52. if err == nil {
  53. return n, err
  54. }
  55. if err == io.EOF {
  56. ws.readBuffer = nil
  57. if n == 0 {
  58. return ws.readNext(b)
  59. }
  60. return n, nil
  61. }
  62. return n, err
  63. }
  64. func (ws *wsconn) Write(b []byte) (n int, err error) {
  65. ws.wlock.Lock()
  66. if ws.connClosing {
  67. return 0, io.EOF
  68. }
  69. n, err = ws.write(b)
  70. ws.wlock.Unlock()
  71. return n, err
  72. }
  73. func (ws *wsconn) write(b []byte) (n int, err error) {
  74. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  75. if err != nil {
  76. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  77. ws.connClosing = true
  78. ws.Close()
  79. return 0, err
  80. }
  81. n, err = wr.Write(b)
  82. if err != nil {
  83. return 0, err
  84. }
  85. err = wr.Close()
  86. if err != nil {
  87. return 0, err
  88. }
  89. return n, err
  90. }
  91. func (ws *wsconn) Close() error {
  92. ws.connClosing = true
  93. ws.wlock.Lock()
  94. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  95. ws.wlock.Unlock()
  96. err := ws.wsc.Close()
  97. return err
  98. }
  99. func (ws *wsconn) LocalAddr() net.Addr {
  100. return ws.wsc.LocalAddr()
  101. }
  102. func (ws *wsconn) RemoteAddr() net.Addr {
  103. return ws.wsc.RemoteAddr()
  104. }
  105. func (ws *wsconn) SetDeadline(t time.Time) error {
  106. return func() error {
  107. errr := ws.SetReadDeadline(t)
  108. errw := ws.SetWriteDeadline(t)
  109. if errr == nil || errw == nil {
  110. return nil
  111. }
  112. if errr != nil {
  113. return errr
  114. }
  115. return errw
  116. }()
  117. }
  118. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  119. return ws.wsc.SetReadDeadline(t)
  120. }
  121. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  122. return ws.wsc.SetWriteDeadline(t)
  123. }
  124. func (ws *wsconn) setup() {
  125. ws.connClosing = false
  126. /*
  127. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  128. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  129. Both Read and write access are both exclusive.
  130. And in both case it will need a lock.
  131. */
  132. ws.rlock = &sync.Mutex{}
  133. ws.wlock = &sync.Mutex{}
  134. ws.pingPong()
  135. }
  136. func (ws *wsconn) Reusable() bool {
  137. return ws.reusable && !ws.connClosing
  138. }
  139. func (ws *wsconn) SetReusable(reusable bool) {
  140. if !ws.config.ConnectionReuse {
  141. return
  142. }
  143. ws.reusable = reusable
  144. }
  145. func (ws *wsconn) pingPong() {
  146. pongRcv := make(chan int, 0)
  147. ws.wsc.SetPongHandler(func(data string) error {
  148. pongRcv <- 0
  149. return nil
  150. })
  151. go func() {
  152. for !ws.connClosing {
  153. ws.wlock.Lock()
  154. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  155. ws.wlock.Unlock()
  156. tick := time.After(time.Second * 3)
  157. select {
  158. case <-pongRcv:
  159. break
  160. case <-tick:
  161. if !ws.connClosing {
  162. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  163. }
  164. ws.Close()
  165. }
  166. <-time.After(time.Second * 27)
  167. }
  168. return
  169. }()
  170. }