wsconn.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package ws
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. "github.com/gorilla/websocket"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. }
  20. func (ws *wsconn) Read(b []byte) (n int, err error) {
  21. ws.rlock.Lock()
  22. n, err = ws.read(b)
  23. ws.rlock.Unlock()
  24. return n, err
  25. }
  26. func (ws *wsconn) read(b []byte) (n int, err error) {
  27. if ws.connClosing {
  28. return 0, io.EOF
  29. }
  30. n, err = ws.readNext(b)
  31. return n, err
  32. }
  33. func (ws *wsconn) getNewReadBuffer() error {
  34. _, r, err := ws.wsc.NextReader()
  35. if err != nil {
  36. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  37. ws.connClosing = true
  38. ws.Close()
  39. return err
  40. }
  41. ws.readBuffer = bufio.NewReader(r)
  42. return nil
  43. }
  44. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  45. if ws.readBuffer == nil {
  46. err = ws.getNewReadBuffer()
  47. if err != nil {
  48. return 0, err
  49. }
  50. }
  51. n, err = ws.readBuffer.Read(b)
  52. if err == nil {
  53. return n, err
  54. }
  55. if err == io.EOF {
  56. ws.readBuffer = nil
  57. if n == 0 {
  58. return ws.readNext(b)
  59. }
  60. return n, nil
  61. }
  62. return n, err
  63. }
  64. func (ws *wsconn) Write(b []byte) (n int, err error) {
  65. ws.wlock.Lock()
  66. /*
  67. process can crash as websocket report "concurrent write to websocket connection"
  68. even if lock is persent.
  69. This problem should have been resolved.
  70. */
  71. defer func() {
  72. if r := recover(); r != nil {
  73. fmt.Println("WS workaround: recover", r)
  74. ws.wlock.Unlock()
  75. }
  76. }()
  77. if ws.connClosing {
  78. return 0, io.EOF
  79. }
  80. writeWs := func(b []byte) (n int, err error) {
  81. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  82. if err != nil {
  83. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  84. ws.connClosing = true
  85. ws.Close()
  86. return 0, err
  87. }
  88. n, err = wr.Write(b)
  89. if err != nil {
  90. return 0, err
  91. }
  92. err = wr.Close()
  93. if err != nil {
  94. return 0, err
  95. }
  96. return n, err
  97. }
  98. n, err = writeWs(b)
  99. ws.wlock.Unlock()
  100. return n, err
  101. }
  102. func (ws *wsconn) Close() error {
  103. ws.connClosing = true
  104. ws.wlock.Lock()
  105. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  106. ws.wlock.Unlock()
  107. err := ws.wsc.Close()
  108. return err
  109. }
  110. func (ws *wsconn) LocalAddr() net.Addr {
  111. return ws.wsc.LocalAddr()
  112. }
  113. func (ws *wsconn) RemoteAddr() net.Addr {
  114. return ws.wsc.RemoteAddr()
  115. }
  116. func (ws *wsconn) SetDeadline(t time.Time) error {
  117. return func() error {
  118. errr := ws.SetReadDeadline(t)
  119. errw := ws.SetWriteDeadline(t)
  120. if errr == nil || errw == nil {
  121. return nil
  122. }
  123. if errr != nil {
  124. return errr
  125. }
  126. return errw
  127. }()
  128. }
  129. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  130. return ws.wsc.SetReadDeadline(t)
  131. }
  132. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  133. return ws.wsc.SetWriteDeadline(t)
  134. }
  135. func (ws *wsconn) setup() {
  136. ws.connClosing = false
  137. /*
  138. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  139. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  140. Both Read and write access are exclusive.
  141. And in both case it will need a lock.
  142. */
  143. ws.rlock = &sync.Mutex{}
  144. ws.wlock = &sync.Mutex{}
  145. ws.pingPong()
  146. }
  147. func (ws *wsconn) Reusable() bool {
  148. return ws.reusable && !ws.connClosing
  149. }
  150. func (ws *wsconn) SetReusable(reusable bool) {
  151. if !effectiveConfig.ConnectionReuse {
  152. return
  153. }
  154. ws.reusable = reusable
  155. }
  156. func (ws *wsconn) pingPong() {
  157. pongRcv := make(chan int, 0)
  158. ws.wsc.SetPongHandler(func(data string) error {
  159. pongRcv <- 0
  160. return nil
  161. })
  162. go func() {
  163. for !ws.connClosing {
  164. ws.wlock.Lock()
  165. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  166. ws.wlock.Unlock()
  167. tick := time.NewTicker(time.Second * 30)
  168. select {
  169. case <-pongRcv:
  170. break
  171. case <-tick.C:
  172. if !ws.connClosing {
  173. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  174. }
  175. ws.Close()
  176. }
  177. <-tick.C
  178. tick.Stop()
  179. }
  180. return
  181. }()
  182. }