wsconn.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package ws
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. "github.com/gorilla/websocket"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. }
  20. func (ws *wsconn) Read(b []byte) (n int, err error) {
  21. ws.rlock.Lock()
  22. if ws.connClosing {
  23. return 0, io.EOF
  24. }
  25. getNewBuffer := func() error {
  26. _, r, err := ws.wsc.NextReader()
  27. if err != nil {
  28. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  29. ws.connClosing = true
  30. ws.Close()
  31. return err
  32. }
  33. ws.readBuffer = bufio.NewReader(r)
  34. return nil
  35. }
  36. /*It seems golang's support for recursive in anonymous func is yet to complete.
  37. func1:=func(){
  38. func1()
  39. }
  40. won't work, failed to compile for it can't find func1.
  41. Should following workaround panic,
  42. readNext could have been called before the actual defination was made,
  43. This is very unlikely.
  44. */
  45. readNext := func(b []byte) (n int, err error) { panic("Runtime unstable. Please report this bug to developer.") }
  46. readNext = func(b []byte) (n int, err error) {
  47. if ws.readBuffer == nil {
  48. err = getNewBuffer()
  49. if err != nil {
  50. return 0, err
  51. }
  52. }
  53. n, err = ws.readBuffer.Read(b)
  54. if err == nil {
  55. return n, err
  56. }
  57. if err == io.EOF {
  58. ws.readBuffer = nil
  59. if n == 0 {
  60. return readNext(b)
  61. }
  62. return n, nil
  63. }
  64. return n, err
  65. }
  66. n, err = readNext(b)
  67. ws.rlock.Unlock()
  68. return n, err
  69. }
  70. func (ws *wsconn) Write(b []byte) (n int, err error) {
  71. ws.wlock.Lock()
  72. /*
  73. process can crash as websocket report "concurrent write to websocket connection"
  74. even if lock is persent.
  75. This problem should have been resolved.
  76. */
  77. defer func() {
  78. if r := recover(); r != nil {
  79. fmt.Println("WS workaround: recover", r)
  80. ws.wlock.Unlock()
  81. }
  82. }()
  83. if ws.connClosing {
  84. return 0, io.EOF
  85. }
  86. writeWs := func(b []byte) (n int, err error) {
  87. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  88. if err != nil {
  89. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  90. ws.connClosing = true
  91. ws.Close()
  92. return 0, err
  93. }
  94. n, err = wr.Write(b)
  95. if err != nil {
  96. return 0, err
  97. }
  98. err = wr.Close()
  99. if err != nil {
  100. return 0, err
  101. }
  102. return n, err
  103. }
  104. n, err = writeWs(b)
  105. ws.wlock.Unlock()
  106. return n, err
  107. }
  108. func (ws *wsconn) Close() error {
  109. ws.connClosing = true
  110. ws.wlock.Lock()
  111. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  112. ws.wlock.Unlock()
  113. err := ws.wsc.Close()
  114. return err
  115. }
  116. func (ws *wsconn) LocalAddr() net.Addr {
  117. return ws.wsc.LocalAddr()
  118. }
  119. func (ws *wsconn) RemoteAddr() net.Addr {
  120. return ws.wsc.RemoteAddr()
  121. }
  122. func (ws *wsconn) SetDeadline(t time.Time) error {
  123. return func() error {
  124. errr := ws.SetReadDeadline(t)
  125. errw := ws.SetWriteDeadline(t)
  126. if errr == nil || errw == nil {
  127. return nil
  128. }
  129. if errr != nil {
  130. return errr
  131. }
  132. return errw
  133. }()
  134. }
  135. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  136. return ws.wsc.SetReadDeadline(t)
  137. }
  138. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  139. return ws.wsc.SetWriteDeadline(t)
  140. }
  141. func (ws *wsconn) setup() {
  142. ws.connClosing = false
  143. ws.rlock = &sync.Mutex{}
  144. ws.wlock = &sync.Mutex{}
  145. ws.pingPong()
  146. }
  147. func (ws *wsconn) Reusable() bool {
  148. return ws.reusable && !ws.connClosing
  149. }
  150. func (ws *wsconn) SetReusable(reusable bool) {
  151. if !effectiveConfig.ConnectionReuse {
  152. return
  153. }
  154. ws.reusable = reusable
  155. }
  156. func (ws *wsconn) pingPong() {
  157. pongRcv := make(chan int, 0)
  158. ws.wsc.SetPongHandler(func(data string) error {
  159. pongRcv <- 0
  160. return nil
  161. })
  162. go func() {
  163. for !ws.connClosing {
  164. ws.wlock.Lock()
  165. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  166. ws.wlock.Unlock()
  167. tick := time.NewTicker(time.Second * 30)
  168. select {
  169. case <-pongRcv:
  170. break
  171. case <-tick.C:
  172. if !ws.connClosing {
  173. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  174. }
  175. ws.Close()
  176. }
  177. <-tick.C
  178. tick.Stop()
  179. }
  180. return
  181. }()
  182. }