wsconn.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package ws
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/v2ray/v2ray-core/common/log"
  9. "github.com/gorilla/websocket"
  10. )
  11. type wsconn struct {
  12. wsc *websocket.Conn
  13. readBuffer *bufio.Reader
  14. connClosing bool
  15. reusable bool
  16. rlock *sync.Mutex
  17. wlock *sync.Mutex
  18. }
  19. func (ws *wsconn) Read(b []byte) (n int, err error) {
  20. ws.rlock.Lock()
  21. n, err = ws.read(b)
  22. ws.rlock.Unlock()
  23. return n, err
  24. }
  25. func (ws *wsconn) read(b []byte) (n int, err error) {
  26. if ws.connClosing {
  27. return 0, io.EOF
  28. }
  29. n, err = ws.readNext(b)
  30. return n, err
  31. }
  32. func (ws *wsconn) getNewReadBuffer() error {
  33. _, r, err := ws.wsc.NextReader()
  34. if err != nil {
  35. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  36. ws.connClosing = true
  37. ws.Close()
  38. return err
  39. }
  40. ws.readBuffer = bufio.NewReader(r)
  41. return nil
  42. }
  43. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  44. if ws.readBuffer == nil {
  45. err = ws.getNewReadBuffer()
  46. if err != nil {
  47. return 0, err
  48. }
  49. }
  50. n, err = ws.readBuffer.Read(b)
  51. if err == nil {
  52. return n, err
  53. }
  54. if err == io.EOF {
  55. ws.readBuffer = nil
  56. if n == 0 {
  57. return ws.readNext(b)
  58. }
  59. return n, nil
  60. }
  61. return n, err
  62. }
  63. func (ws *wsconn) Write(b []byte) (n int, err error) {
  64. ws.wlock.Lock()
  65. if ws.connClosing {
  66. return 0, io.EOF
  67. }
  68. writeWs := func(b []byte) (n int, err error) {
  69. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  70. if err != nil {
  71. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  72. ws.connClosing = true
  73. ws.Close()
  74. return 0, err
  75. }
  76. n, err = wr.Write(b)
  77. if err != nil {
  78. return 0, err
  79. }
  80. err = wr.Close()
  81. if err != nil {
  82. return 0, err
  83. }
  84. return n, err
  85. }
  86. n, err = writeWs(b)
  87. ws.wlock.Unlock()
  88. return n, err
  89. }
  90. func (ws *wsconn) Close() error {
  91. ws.connClosing = true
  92. ws.wlock.Lock()
  93. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  94. ws.wlock.Unlock()
  95. err := ws.wsc.Close()
  96. return err
  97. }
  98. func (ws *wsconn) LocalAddr() net.Addr {
  99. return ws.wsc.LocalAddr()
  100. }
  101. func (ws *wsconn) RemoteAddr() net.Addr {
  102. return ws.wsc.RemoteAddr()
  103. }
  104. func (ws *wsconn) SetDeadline(t time.Time) error {
  105. return func() error {
  106. errr := ws.SetReadDeadline(t)
  107. errw := ws.SetWriteDeadline(t)
  108. if errr == nil || errw == nil {
  109. return nil
  110. }
  111. if errr != nil {
  112. return errr
  113. }
  114. return errw
  115. }()
  116. }
  117. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  118. return ws.wsc.SetReadDeadline(t)
  119. }
  120. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  121. return ws.wsc.SetWriteDeadline(t)
  122. }
  123. func (ws *wsconn) setup() {
  124. ws.connClosing = false
  125. /*
  126. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  127. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  128. Both Read and write access are exclusive.
  129. And in both case it will need a lock.
  130. */
  131. ws.rlock = &sync.Mutex{}
  132. ws.wlock = &sync.Mutex{}
  133. ws.pingPong()
  134. }
  135. func (ws *wsconn) Reusable() bool {
  136. return ws.reusable && !ws.connClosing
  137. }
  138. func (ws *wsconn) SetReusable(reusable bool) {
  139. if !effectiveConfig.ConnectionReuse {
  140. return
  141. }
  142. ws.reusable = reusable
  143. }
  144. func (ws *wsconn) pingPong() {
  145. pongRcv := make(chan int, 0)
  146. ws.wsc.SetPongHandler(func(data string) error {
  147. pongRcv <- 0
  148. return nil
  149. })
  150. go func() {
  151. for !ws.connClosing {
  152. ws.wlock.Lock()
  153. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  154. ws.wlock.Unlock()
  155. tick := time.NewTicker(time.Second * 30)
  156. select {
  157. case <-pongRcv:
  158. break
  159. case <-tick.C:
  160. if !ws.connClosing {
  161. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  162. }
  163. ws.Close()
  164. }
  165. <-tick.C
  166. tick.Stop()
  167. }
  168. return
  169. }()
  170. }