wsconn.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package ws
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. "github.com/gorilla/websocket"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. retloc *sync.Cond
  18. rlock *sync.Mutex
  19. wlock *sync.Mutex
  20. }
  21. func (ws *wsconn) Read(b []byte) (n int, err error) {
  22. ws.rlock.Lock()
  23. //defer ws.rlock.Unlock()
  24. //ws.checkifRWAfterClosing()
  25. if ws.connClosing {
  26. return 0, io.EOF
  27. }
  28. getNewBuffer := func() error {
  29. _, r, err := ws.wsc.NextReader()
  30. if err != nil {
  31. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  32. ws.connClosing = true
  33. ws.Close()
  34. return err
  35. }
  36. ws.readBuffer = bufio.NewReader(r)
  37. return nil
  38. }
  39. /*It seems golang's support for recursive in anonymous func it yet to complete.
  40. func1:=func(){
  41. func1()
  42. }
  43. won't work, failed to compile for it can't find func1.
  44. Should following work around panic,
  45. readNext could have been called before the actual defination was made,
  46. This is very unlikely.
  47. */
  48. readNext := func(b []byte) (n int, err error) { panic("Runtime unstable. Please report this bug to developer.") }
  49. readNext = func(b []byte) (n int, err error) {
  50. if ws.readBuffer == nil {
  51. err = getNewBuffer()
  52. if err != nil {
  53. //ws.Close()
  54. return 0, err
  55. }
  56. }
  57. n, err = ws.readBuffer.Read(b)
  58. if err == nil {
  59. return n, err
  60. }
  61. if err == io.EOF {
  62. ws.readBuffer = nil
  63. if n == 0 {
  64. return readNext(b)
  65. }
  66. return n, nil
  67. }
  68. //ws.Close()
  69. return n, err
  70. }
  71. n, err = readNext(b)
  72. ws.rlock.Unlock()
  73. return n, err
  74. }
  75. func (ws *wsconn) Write(b []byte) (n int, err error) {
  76. ws.wlock.Lock()
  77. defer func() {
  78. if r := recover(); r != nil {
  79. fmt.Println("WS workaround: recover", r)
  80. ws.wlock.Unlock()
  81. }
  82. }()
  83. //defer
  84. //ws.checkifRWAfterClosing()
  85. if ws.connClosing {
  86. return 0, io.EOF
  87. }
  88. writeWs := func(b []byte) (n int, err error) {
  89. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  90. if err != nil {
  91. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  92. ws.connClosing = true
  93. ws.Close()
  94. return 0, err
  95. }
  96. n, err = wr.Write(b)
  97. if err != nil {
  98. //ws.Close()
  99. return 0, err
  100. }
  101. err = wr.Close()
  102. if err != nil {
  103. //ws.Close()
  104. return 0, err
  105. }
  106. return n, err
  107. }
  108. n, err = writeWs(b)
  109. ws.wlock.Unlock()
  110. return n, err
  111. }
  112. func (ws *wsconn) Close() error {
  113. ws.connClosing = true
  114. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  115. err := ws.wsc.Close()
  116. ws.retloc.Broadcast()
  117. return err
  118. }
  119. func (ws *wsconn) LocalAddr() net.Addr {
  120. return ws.wsc.LocalAddr()
  121. }
  122. func (ws *wsconn) RemoteAddr() net.Addr {
  123. return ws.wsc.RemoteAddr()
  124. }
  125. func (ws *wsconn) SetDeadline(t time.Time) error {
  126. return func() error {
  127. errr := ws.SetReadDeadline(t)
  128. errw := ws.SetWriteDeadline(t)
  129. if errr == nil || errw == nil {
  130. return nil
  131. }
  132. if errr != nil {
  133. return errr
  134. }
  135. return errw
  136. }()
  137. }
  138. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  139. return ws.wsc.SetReadDeadline(t)
  140. }
  141. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  142. return ws.wsc.SetWriteDeadline(t)
  143. }
  144. func (ws *wsconn) checkifRWAfterClosing() {
  145. if ws.connClosing {
  146. log.Error("WS transport: Read or Write After Conn have been marked closing, this can be dangerous.")
  147. //panic("WS transport: Read or Write After Conn have been marked closing. Please report this crash to developer.")
  148. }
  149. }
  150. func (ws *wsconn) setup() {
  151. ws.connClosing = false
  152. ws.rlock = &sync.Mutex{}
  153. ws.wlock = &sync.Mutex{}
  154. initConnectedCond := func() {
  155. rsl := &sync.Mutex{}
  156. ws.retloc = sync.NewCond(rsl)
  157. }
  158. initConnectedCond()
  159. ws.pingPong()
  160. }
  161. func (ws *wsconn) Reusable() bool {
  162. return ws.reusable && !ws.connClosing
  163. }
  164. func (ws *wsconn) SetReusable(reusable bool) {
  165. if !effectiveConfig.ConnectionReuse {
  166. return
  167. }
  168. ws.reusable = reusable
  169. }
  170. func (ws *wsconn) pingPong() {
  171. pongRcv := make(chan int, 0)
  172. ws.wsc.SetPongHandler(func(data string) error {
  173. pongRcv <- 0
  174. return nil
  175. })
  176. go func() {
  177. for !ws.connClosing {
  178. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  179. tick := time.NewTicker(time.Second * 3)
  180. select {
  181. case <-pongRcv:
  182. //log.Debug("WS:Pong~" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  183. break
  184. case <-tick.C:
  185. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  186. ws.Close()
  187. }
  188. <-tick.C
  189. tick.Stop()
  190. }
  191. return
  192. }()
  193. }