wsconn.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package ws
  2. import (
  3. "bufio"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/v2ray/v2ray-core/common/log"
  9. "github.com/gorilla/websocket"
  10. )
  11. type wsconn struct {
  12. wsc *websocket.Conn
  13. readBuffer *bufio.Reader
  14. connClosing bool
  15. reusable bool
  16. retloc *sync.Cond
  17. rlock *sync.Mutex
  18. wlock *sync.Mutex
  19. }
  20. func (ws *wsconn) Read(b []byte) (n int, err error) {
  21. //defer ws.rlock.Unlock()
  22. //ws.checkifRWAfterClosing()
  23. if ws.connClosing {
  24. return 0, io.EOF
  25. }
  26. getNewBuffer := func() error {
  27. _, r, err := ws.wsc.NextReader()
  28. if err != nil {
  29. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  30. ws.connClosing = true
  31. ws.Close()
  32. return err
  33. }
  34. ws.readBuffer = bufio.NewReader(r)
  35. return nil
  36. }
  37. readNext := func(b []byte) (n int, err error) {
  38. if ws.readBuffer == nil {
  39. err = getNewBuffer()
  40. if err != nil {
  41. //ws.Close()
  42. return 0, err
  43. }
  44. }
  45. n, err = ws.readBuffer.Read(b)
  46. if err == nil {
  47. return n, err
  48. }
  49. if err == io.EOF {
  50. ws.readBuffer = nil
  51. if n == 0 {
  52. return ws.Read(b)
  53. }
  54. return n, nil
  55. }
  56. //ws.Close()
  57. return n, err
  58. }
  59. n, err = readNext(b)
  60. return n, err
  61. }
  62. func (ws *wsconn) Write(b []byte) (n int, err error) {
  63. //defer
  64. //ws.checkifRWAfterClosing()
  65. if ws.connClosing {
  66. return 0, io.EOF
  67. }
  68. writeWs := func(b []byte) (n int, err error) {
  69. wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
  70. if err != nil {
  71. log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
  72. ws.connClosing = true
  73. ws.Close()
  74. return 0, err
  75. }
  76. n, err = wr.Write(b)
  77. if err != nil {
  78. //ws.Close()
  79. return 0, err
  80. }
  81. err = wr.Close()
  82. if err != nil {
  83. //ws.Close()
  84. return 0, err
  85. }
  86. return n, err
  87. }
  88. n, err = writeWs(b)
  89. return n, err
  90. }
  91. func (ws *wsconn) Close() error {
  92. ws.connClosing = true
  93. err := ws.wsc.Close()
  94. ws.retloc.Broadcast()
  95. return err
  96. }
  97. func (ws *wsconn) LocalAddr() net.Addr {
  98. return ws.wsc.LocalAddr()
  99. }
  100. func (ws *wsconn) RemoteAddr() net.Addr {
  101. return ws.wsc.RemoteAddr()
  102. }
  103. func (ws *wsconn) SetDeadline(t time.Time) error {
  104. return func() error {
  105. errr := ws.SetReadDeadline(t)
  106. errw := ws.SetWriteDeadline(t)
  107. if errr == nil || errw == nil {
  108. return nil
  109. }
  110. if errr != nil {
  111. return errr
  112. }
  113. return errw
  114. }()
  115. }
  116. func (ws *wsconn) SetReadDeadline(t time.Time) error {
  117. return ws.wsc.SetReadDeadline(t)
  118. }
  119. func (ws *wsconn) SetWriteDeadline(t time.Time) error {
  120. return ws.wsc.SetWriteDeadline(t)
  121. }
  122. func (ws *wsconn) checkifRWAfterClosing() {
  123. if ws.connClosing {
  124. log.Error("WS transport: Read or Write After Conn have been marked closing, this can be dangerous.")
  125. //panic("WS transport: Read or Write After Conn have been marked closing. Please report this crash to developer.")
  126. }
  127. }
  128. func (ws *wsconn) setup() {
  129. ws.connClosing = false
  130. ws.rlock = &sync.Mutex{}
  131. ws.wlock = &sync.Mutex{}
  132. initConnectedCond := func() {
  133. rsl := &sync.Mutex{}
  134. ws.retloc = sync.NewCond(rsl)
  135. }
  136. initConnectedCond()
  137. //ws.pingPong()
  138. }
  139. func (ws *wsconn) Reusable() bool {
  140. return ws.reusable && !ws.connClosing
  141. }
  142. func (ws *wsconn) SetReusable(reusable bool) {
  143. if !effectiveConfig.ConnectionReuse {
  144. return
  145. }
  146. ws.reusable = reusable
  147. }
  148. func (ws *wsconn) pingPong() {
  149. pongRcv := make(chan int, 0)
  150. ws.wsc.SetPongHandler(func(data string) error {
  151. pongRcv <- 0
  152. return nil
  153. })
  154. go func() {
  155. for !ws.connClosing {
  156. ws.wsc.WriteMessage(websocket.PingMessage, nil)
  157. tick := time.NewTicker(time.Second * 3)
  158. select {
  159. case <-pongRcv:
  160. break
  161. case <-tick.C:
  162. ws.Close()
  163. }
  164. <-tick.C
  165. tick.Stop()
  166. }
  167. return
  168. }()
  169. }