wsconn.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package websocket
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. "v2ray.com/core/app/log"
  10. "v2ray.com/core/common/errors"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. rlock *sync.Mutex
  17. wlock *sync.Mutex
  18. }
  19. func (ws *wsconn) Read(b []byte) (n int, err error) {
  20. ws.rlock.Lock()
  21. n, err = ws.read(b)
  22. ws.rlock.Unlock()
  23. return n, err
  24. }
  25. func (ws *wsconn) read(b []byte) (n int, err error) {
  26. if ws.connClosing {
  27. return 0, io.EOF
  28. }
  29. n, err = ws.readNext(b)
  30. return n, err
  31. }
  32. func (ws *wsconn) getNewReadBuffer() error {
  33. _, r, err := ws.wsc.NextReader()
  34. if err != nil {
  35. log.Warning("WebSocket|Connection: Failed to get reader.", err)
  36. ws.connClosing = true
  37. ws.Close()
  38. return err
  39. }
  40. ws.readBuffer = bufio.NewReader(r)
  41. return nil
  42. }
  43. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  44. if ws.readBuffer == nil {
  45. err = ws.getNewReadBuffer()
  46. if err != nil {
  47. return 0, err
  48. }
  49. }
  50. n, err = ws.readBuffer.Read(b)
  51. if err == nil {
  52. return n, err
  53. }
  54. if errors.Cause(err) == io.EOF {
  55. ws.readBuffer = nil
  56. if n == 0 {
  57. return ws.readNext(b)
  58. }
  59. return n, nil
  60. }
  61. return n, err
  62. }
  63. func (ws *wsconn) Write(b []byte) (n int, err error) {
  64. ws.wlock.Lock()
  65. if ws.connClosing {
  66. return 0, io.ErrClosedPipe
  67. }
  68. n, err = ws.write(b)
  69. ws.wlock.Unlock()
  70. return n, err
  71. }
  72. func (ws *wsconn) write(b []byte) (n int, err error) {
  73. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  74. if err != nil {
  75. log.Warning("WebSocket|Connection: Failed to get writer.", err)
  76. ws.connClosing = true
  77. ws.Close()
  78. return 0, err
  79. }
  80. n, err = wr.Write(b)
  81. if err != nil {
  82. return 0, err
  83. }
  84. err = wr.Close()
  85. if err != nil {
  86. return 0, err
  87. }
  88. return n, err
  89. }
  90. func (ws *wsconn) Close() error {
  91. ws.connClosing = true
  92. ws.wlock.Lock()
  93. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  94. ws.wlock.Unlock()
  95. err := ws.wsc.Close()
  96. return err
  97. }
  98. func (ws *wsconn) LocalAddr() net.Addr {
  99. return ws.wsc.LocalAddr()
  100. }
  101. func (ws *wsconn) RemoteAddr() net.Addr {
  102. return ws.wsc.RemoteAddr()
  103. }
  104. func (ws *wsconn) SetDeadline(t time.Time) error {
  105. if err := ws.SetReadDeadline(t); err != nil {
  106. return err
  107. }
  108. return ws.SetWriteDeadline(t)
  109. }
  110. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  111. return ws.wsc.SetReadDeadline(t)
  112. }
  113. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  114. return ws.wsc.SetWriteDeadline(t)
  115. }
  116. func (ws *wsconn) setup() {
  117. ws.connClosing = false
  118. /*
  119. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  120. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  121. Both Read and write access are both exclusive.
  122. And in both case it will need a lock.
  123. */
  124. ws.rlock = &sync.Mutex{}
  125. ws.wlock = &sync.Mutex{}
  126. ws.pingPong()
  127. }
  128. func (ws *wsconn) pingPong() {
  129. pongRcv := make(chan int, 1)
  130. ws.wsc.SetPongHandler(func(data string) error {
  131. pongRcv <- 0
  132. return nil
  133. })
  134. go func() {
  135. for !ws.connClosing {
  136. ws.wlock.Lock()
  137. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  138. ws.wlock.Unlock()
  139. tick := time.After(time.Second * 3)
  140. select {
  141. case <-pongRcv:
  142. case <-tick:
  143. if !ws.connClosing {
  144. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  145. }
  146. ws.Close()
  147. }
  148. <-time.After(time.Second * 27)
  149. }
  150. return
  151. }()
  152. }