wsconn.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package ws
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. "github.com/gorilla/websocket"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. }
  20. func (ws *wsconn) Read(b []byte) (n int, err error) {
  21. ws.rlock.Lock()
  22. //defer ws.rlock.Unlock()
  23. //ws.checkifRWAfterClosing()
  24. if ws.connClosing {
  25. return 0, io.EOF
  26. }
  27. getNewBuffer := func() error {
  28. _, r, err := ws.wsc.NextReader()
  29. if err != nil {
  30. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  31. ws.connClosing = true
  32. ws.Close()
  33. return err
  34. }
  35. ws.readBuffer = bufio.NewReader(r)
  36. return nil
  37. }
  38. /*It seems golang's support for recursive in anonymous func it yet to complete.
  39. func1:=func(){
  40. func1()
  41. }
  42. won't work, failed to compile for it can't find func1.
  43. Should following work around panic,
  44. readNext could have been called before the actual defination was made,
  45. This is very unlikely.
  46. */
  47. readNext := func(b []byte) (n int, err error) { panic("Runtime unstable. Please report this bug to developer.") }
  48. readNext = func(b []byte) (n int, err error) {
  49. if ws.readBuffer == nil {
  50. err = getNewBuffer()
  51. if err != nil {
  52. return 0, err
  53. }
  54. }
  55. n, err = ws.readBuffer.Read(b)
  56. if err == nil {
  57. return n, err
  58. }
  59. if err == io.EOF {
  60. ws.readBuffer = nil
  61. if n == 0 {
  62. return readNext(b)
  63. }
  64. return n, nil
  65. }
  66. return n, err
  67. }
  68. n, err = readNext(b)
  69. ws.rlock.Unlock()
  70. return n, err
  71. }
  72. func (ws *wsconn) Write(b []byte) (n int, err error) {
  73. ws.wlock.Lock()
  74. /*
  75. process can crash as websocket report "concurrent write to websocket connection"
  76. even if lock is persent.
  77. It is yet to know how to prevent this but a workaround is the only choice.
  78. */
  79. defer func() {
  80. if r := recover(); r != nil {
  81. fmt.Println("WS workaround: recover", r)
  82. ws.wlock.Unlock()
  83. }
  84. }()
  85. if ws.connClosing {
  86. return 0, io.EOF
  87. }
  88. writeWs := func(b []byte) (n int, err error) {
  89. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  90. if err != nil {
  91. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  92. ws.connClosing = true
  93. ws.Close()
  94. return 0, err
  95. }
  96. n, err = wr.Write(b)
  97. if err != nil {
  98. return 0, err
  99. }
  100. err = wr.Close()
  101. if err != nil {
  102. return 0, err
  103. }
  104. return n, err
  105. }
  106. n, err = writeWs(b)
  107. ws.wlock.Unlock()
  108. return n, err
  109. }
  110. func (ws *wsconn) Close() error {
  111. ws.connClosing = true
  112. ws.wlock.Lock()
  113. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  114. ws.wlock.Unlock()
  115. err := ws.wsc.Close()
  116. return err
  117. }
  118. func (ws *wsconn) LocalAddr() net.Addr {
  119. return ws.wsc.LocalAddr()
  120. }
  121. func (ws *wsconn) RemoteAddr() net.Addr {
  122. return ws.wsc.RemoteAddr()
  123. }
  124. func (ws *wsconn) SetDeadline(t time.Time) error {
  125. return func() error {
  126. errr := ws.SetReadDeadline(t)
  127. errw := ws.SetWriteDeadline(t)
  128. if errr == nil || errw == nil {
  129. return nil
  130. }
  131. if errr != nil {
  132. return errr
  133. }
  134. return errw
  135. }()
  136. }
  137. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  138. return ws.wsc.SetReadDeadline(t)
  139. }
  140. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  141. return ws.wsc.SetWriteDeadline(t)
  142. }
  143. func (ws *wsconn) setup() {
  144. ws.connClosing = false
  145. ws.rlock = &sync.Mutex{}
  146. ws.wlock = &sync.Mutex{}
  147. ws.pingPong()
  148. }
  149. func (ws *wsconn) Reusable() bool {
  150. return ws.reusable && !ws.connClosing
  151. }
  152. func (ws *wsconn) SetReusable(reusable bool) {
  153. if !effectiveConfig.ConnectionReuse {
  154. return
  155. }
  156. ws.reusable = reusable
  157. }
  158. func (ws *wsconn) pingPong() {
  159. pongRcv := make(chan int, 0)
  160. ws.wsc.SetPongHandler(func(data string) error {
  161. pongRcv <- 0
  162. return nil
  163. })
  164. go func() {
  165. for !ws.connClosing {
  166. ws.wlock.Lock()
  167. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  168. ws.wlock.Unlock()
  169. tick := time.NewTicker(time.Second * 3)
  170. select {
  171. case <-pongRcv:
  172. break
  173. case <-tick.C:
  174. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  175. ws.Close()
  176. }
  177. <-tick.C
  178. tick.Stop()
  179. }
  180. return
  181. }()
  182. }