wsconn.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package websocket
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. "v2ray.com/core/common/errors"
  10. "v2ray.com/core/common/log"
  11. )
  12. type wsconn struct {
  13. wsc *websocket.Conn
  14. readBuffer *bufio.Reader
  15. connClosing bool
  16. reusable bool
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. config *Config
  20. }
  21. func (ws *wsconn) Read(b []byte) (n int, err error) {
  22. ws.rlock.Lock()
  23. n, err = ws.read(b)
  24. ws.rlock.Unlock()
  25. return n, err
  26. }
  27. func (ws *wsconn) read(b []byte) (n int, err error) {
  28. if ws.connClosing {
  29. return 0, io.EOF
  30. }
  31. n, err = ws.readNext(b)
  32. return n, err
  33. }
  34. func (ws *wsconn) getNewReadBuffer() error {
  35. _, r, err := ws.wsc.NextReader()
  36. if err != nil {
  37. log.Warning("WS transport: ws connection NewFrameReader return ", err)
  38. ws.connClosing = true
  39. ws.Close()
  40. return err
  41. }
  42. ws.readBuffer = bufio.NewReader(r)
  43. return nil
  44. }
  45. func (ws *wsconn) readNext(b []byte) (n int, err error) {
  46. if ws.readBuffer == nil {
  47. err = ws.getNewReadBuffer()
  48. if err != nil {
  49. return 0, err
  50. }
  51. }
  52. n, err = ws.readBuffer.Read(b)
  53. if err == nil {
  54. return n, err
  55. }
  56. if errors.Cause(err) == io.EOF {
  57. ws.readBuffer = nil
  58. if n == 0 {
  59. return ws.readNext(b)
  60. }
  61. return n, nil
  62. }
  63. return n, err
  64. }
  65. func (ws *wsconn) Write(b []byte) (n int, err error) {
  66. ws.wlock.Lock()
  67. if ws.connClosing {
  68. return 0, io.ErrClosedPipe
  69. }
  70. n, err = ws.write(b)
  71. ws.wlock.Unlock()
  72. return n, err
  73. }
  74. func (ws *wsconn) write(b []byte) (n int, err error) {
  75. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  76. if err != nil {
  77. log.Warning("WS transport: ws connection NewFrameReader return ", err)
  78. ws.connClosing = true
  79. ws.Close()
  80. return 0, err
  81. }
  82. n, err = wr.Write(b)
  83. if err != nil {
  84. return 0, err
  85. }
  86. err = wr.Close()
  87. if err != nil {
  88. return 0, err
  89. }
  90. return n, err
  91. }
  92. func (ws *wsconn) Close() error {
  93. ws.connClosing = true
  94. ws.wlock.Lock()
  95. ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
  96. ws.wlock.Unlock()
  97. err := ws.wsc.Close()
  98. return err
  99. }
  100. func (ws *wsconn) LocalAddr() net.Addr {
  101. return ws.wsc.LocalAddr()
  102. }
  103. func (ws *wsconn) RemoteAddr() net.Addr {
  104. return ws.wsc.RemoteAddr()
  105. }
  106. func (ws *wsconn) SetDeadline(t time.Time) error {
  107. if err := ws.SetReadDeadline(t); err != nil {
  108. return err
  109. }
  110. return ws.SetWriteDeadline(t)
  111. }
  112. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  113. return ws.wsc.SetReadDeadline(t)
  114. }
  115. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  116. return ws.wsc.SetWriteDeadline(t)
  117. }
  118. func (ws *wsconn) setup() {
  119. ws.connClosing = false
  120. /*
  121. https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
  122. https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
  123. Both Read and write access are both exclusive.
  124. And in both case it will need a lock.
  125. */
  126. ws.rlock = &sync.Mutex{}
  127. ws.wlock = &sync.Mutex{}
  128. ws.pingPong()
  129. }
  130. func (ws *wsconn) Reusable() bool {
  131. return ws.config.IsConnectionReuse() && ws.reusable && !ws.connClosing
  132. }
  133. func (ws *wsconn) SetReusable(reusable bool) {
  134. ws.reusable = reusable
  135. }
  136. func (ws *wsconn) pingPong() {
  137. pongRcv := make(chan int, 0)
  138. ws.wsc.SetPongHandler(func(data string) error {
  139. pongRcv <- 0
  140. return nil
  141. })
  142. go func() {
  143. for !ws.connClosing {
  144. ws.wlock.Lock()
  145. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  146. ws.wlock.Unlock()
  147. tick := time.After(time.Second * 3)
  148. select {
  149. case <-pongRcv:
  150. case <-tick:
  151. if !ws.connClosing {
  152. log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
  153. }
  154. ws.Close()
  155. }
  156. <-time.After(time.Second * 27)
  157. }
  158. return
  159. }()
  160. }