wsconn.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package ws
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. "v2ray.com/core/common/errors"
  10. "v2ray.com/core/common/log"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. config *Config
  20. }
  21. func (ws *wsconn) Read(b []byte) (n int, err error) {
  22. ws.rlock.Lock()
  23. n, err = ws.read(b)
  24. ws.rlock.Unlock()
  25. return n, err
  26. }
  27. func (ws *wsconn) read(b []byte) (n int, err error) {
  28. if ws.connClosing {
  29. return 0, io.EOF
  30. }
  31. n, err = ws.readNext(b)
  32. return n, err
  33. }
  34. func (ws *wsconn) getNewReadBuffer() error {
  35. _, r, err := ws.wsc.NextReader()
  36. if err != nil {
  37. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  38. ws.connClosing = true
  39. ws.Close()
  40. return err
  41. }
  42. ws.readBuffer = bufio.NewReader(r)
  43. return nil
  44. }
  45. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  46. if ws.readBuffer == nil {
  47. err = ws.getNewReadBuffer()
  48. if err != nil {
  49. return 0, err
  50. }
  51. }
  52. n, err = ws.readBuffer.Read(b)
  53. if err == nil {
  54. return n, err
  55. }
  56. if errors.Cause(err) == io.EOF {
  57. ws.readBuffer = nil
  58. if n == 0 {
  59. return ws.readNext(b)
  60. }
  61. return n, nil
  62. }
  63. return n, err
  64. }
  65. func (ws *wsconn) Write(b []byte) (n int, err error) {
  66. ws.wlock.Lock()
  67. if ws.connClosing {
  68. return 0, io.ErrClosedPipe
  69. }
  70. n, err = ws.write(b)
  71. ws.wlock.Unlock()
  72. return n, err
  73. }
  74. func (ws *wsconn) write(b []byte) (n int, err error) {
  75. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  76. if err != nil {
  77. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  78. ws.connClosing = true
  79. ws.Close()
  80. return 0, err
  81. }
  82. n, err = wr.Write(b)
  83. if err != nil {
  84. return 0, err
  85. }
  86. err = wr.Close()
  87. if err != nil {
  88. return 0, err
  89. }
  90. return n, err
  91. }
  92. func (ws *wsconn) Close() error {
  93. ws.connClosing = true
  94. ws.wlock.Lock()
  95. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  96. ws.wlock.Unlock()
  97. err := ws.wsc.Close()
  98. return err
  99. }
  100. func (ws *wsconn) LocalAddr() net.Addr {
  101. return ws.wsc.LocalAddr()
  102. }
  103. func (ws *wsconn) RemoteAddr() net.Addr {
  104. return ws.wsc.RemoteAddr()
  105. }
  106. func (ws *wsconn) SetDeadline(t time.Time) error {
  107. return func() error {
  108. errr := ws.SetReadDeadline(t)
  109. errw := ws.SetWriteDeadline(t)
  110. if errr == nil || errw == nil {
  111. return nil
  112. }
  113. if errr != nil {
  114. return errr
  115. }
  116. return errw
  117. }()
  118. }
  119. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  120. return ws.wsc.SetReadDeadline(t)
  121. }
  122. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  123. return ws.wsc.SetWriteDeadline(t)
  124. }
  125. func (ws *wsconn) setup() {
  126. ws.connClosing = false
  127. /*
  128. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  129. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  130. Both Read and write access are both exclusive.
  131. And in both case it will need a lock.
  132. */
  133. ws.rlock = &sync.Mutex{}
  134. ws.wlock = &sync.Mutex{}
  135. ws.pingPong()
  136. }
  137. func (ws *wsconn) Reusable() bool {
  138. return ws.reusable && !ws.connClosing
  139. }
  140. func (ws *wsconn) SetReusable(reusable bool) {
  141. if !ws.config.ConnectionReuse.IsEnabled() {
  142. return
  143. }
  144. ws.reusable = reusable
  145. }
  146. func (ws *wsconn) pingPong() {
  147. pongRcv := make(chan int, 0)
  148. ws.wsc.SetPongHandler(func(data string) error {
  149. pongRcv <- 0
  150. return nil
  151. })
  152. go func() {
  153. for !ws.connClosing {
  154. ws.wlock.Lock()
  155. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  156. ws.wlock.Unlock()
  157. tick := time.After(time.Second * 3)
  158. select {
  159. case <-pongRcv:
  160. break
  161. case <-tick:
  162. if !ws.connClosing {
  163. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  164. }
  165. ws.Close()
  166. }
  167. <-time.After(time.Second * 27)
  168. }
  169. return
  170. }()
  171. }