connection.go 4.8 KB


  1. package websocket
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/v2fly/v2ray-core/v4/common/buf"
  9. "github.com/v2fly/v2ray-core/v4/common/errors"
  10. "github.com/v2fly/v2ray-core/v4/common/serial"
  11. )
  12. var _ buf.Writer = (*connection)(nil)
  13. // connection is a wrapper for net.Conn over WebSocket connection.
  14. type connection struct {
  15. conn *websocket.Conn
  16. reader io.Reader
  17. remoteAddr net.Addr
  18. shouldWait bool
  19. delayedDialFinish context.Context
  20. finishedDial context.CancelFunc
  21. dialer DelayedDialer
  22. }
  23. type DelayedDialer interface {
  24. Dial(earlyData []byte) (*websocket.Conn, error)
  25. }
  26. func newConnection(conn *websocket.Conn, remoteAddr net.Addr) *connection {
  27. return &connection{
  28. conn: conn,
  29. remoteAddr: remoteAddr,
  30. }
  31. }
  32. func newConnectionWithEarlyData(conn *websocket.Conn, remoteAddr net.Addr, earlyData io.Reader) *connection {
  33. return &connection{
  34. conn: conn,
  35. remoteAddr: remoteAddr,
  36. reader: earlyData,
  37. }
  38. }
  39. func newConnectionWithDelayedDial(dialer DelayedDialer) *connection {
  40. delayedDialContext, CancellFunc := context.WithCancel(context.Background())
  41. return &connection{
  42. shouldWait: true,
  43. delayedDialFinish: delayedDialContext,
  44. finishedDial: CancellFunc,
  45. dialer: dialer,
  46. }
  47. }
  48. func newRelayedConnectionWithDelayedDial(dialer DelayedDialerForwarded) *connectionForwarder {
  49. delayedDialContext, CancellFunc := context.WithCancel(context.Background())
  50. return &connectionForwarder{
  51. shouldWait: true,
  52. delayedDialFinish: delayedDialContext,
  53. finishedDial: CancellFunc,
  54. dialer: dialer,
  55. }
  56. }
  57. func newRelayedConnection(conn io.ReadWriteCloser) *connectionForwarder {
  58. return &connectionForwarder{
  59. ReadWriteCloser: conn,
  60. shouldWait: false,
  61. }
  62. }
  63. // Read implements net.Conn.Read()
  64. func (c *connection) Read(b []byte) (int, error) {
  65. for {
  66. reader, err := c.getReader()
  67. if err != nil {
  68. return 0, err
  69. }
  70. nBytes, err := reader.Read(b)
  71. if errors.Cause(err) == io.EOF {
  72. c.reader = nil
  73. continue
  74. }
  75. return nBytes, err
  76. }
  77. }
  78. func (c *connection) getReader() (io.Reader, error) {
  79. if c.shouldWait {
  80. <-c.delayedDialFinish.Done()
  81. if c.conn == nil {
  82. return nil, newError("unable to read delayed dial websocket connection as it do not exist")
  83. }
  84. }
  85. if c.reader != nil {
  86. return c.reader, nil
  87. }
  88. _, reader, err := c.conn.NextReader()
  89. if err != nil {
  90. return nil, err
  91. }
  92. c.reader = reader
  93. return reader, nil
  94. }
  95. // Write implements io.Writer.
  96. func (c *connection) Write(b []byte) (int, error) {
  97. if c.shouldWait {
  98. var err error
  99. c.conn, err = c.dialer.Dial(b)
  100. c.finishedDial()
  101. if err != nil {
  102. return 0, newError("Unable to proceed with delayed write").Base(err)
  103. }
  104. c.remoteAddr = c.conn.RemoteAddr()
  105. c.shouldWait = false
  106. return len(b), nil
  107. }
  108. if err := c.conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
  109. return 0, err
  110. }
  111. return len(b), nil
  112. }
  113. func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
  114. mb = buf.Compact(mb)
  115. mb, err := buf.WriteMultiBuffer(c, mb)
  116. buf.ReleaseMulti(mb)
  117. return err
  118. }
  119. func (c *connection) Close() error {
  120. if c.shouldWait {
  121. <-c.delayedDialFinish.Done()
  122. if c.conn == nil {
  123. return newError("unable to close delayed dial websocket connection as it do not exist")
  124. }
  125. }
  126. var errors []interface{}
  127. if err := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5)); err != nil {
  128. errors = append(errors, err)
  129. }
  130. if err := c.conn.Close(); err != nil {
  131. errors = append(errors, err)
  132. }
  133. if len(errors) > 0 {
  134. return newError("failed to close connection").Base(newError(serial.Concat(errors...)))
  135. }
  136. return nil
  137. }
  138. func (c *connection) LocalAddr() net.Addr {
  139. if c.shouldWait {
  140. <-c.delayedDialFinish.Done()
  141. if c.conn == nil {
  142. newError("websocket transport is not materialized when LocalAddr() is called").AtWarning().WriteToLog()
  143. return &net.UnixAddr{
  144. Name: "@placeholder",
  145. Net: "unix",
  146. }
  147. }
  148. }
  149. return c.conn.LocalAddr()
  150. }
  151. func (c *connection) RemoteAddr() net.Addr {
  152. return c.remoteAddr
  153. }
  154. func (c *connection) SetDeadline(t time.Time) error {
  155. if err := c.SetReadDeadline(t); err != nil {
  156. return err
  157. }
  158. return c.SetWriteDeadline(t)
  159. }
  160. func (c *connection) SetReadDeadline(t time.Time) error {
  161. if c.shouldWait {
  162. <-c.delayedDialFinish.Done()
  163. if c.conn == nil {
  164. newError("websocket transport is not materialized when SetReadDeadline() is called").AtWarning().WriteToLog()
  165. return nil
  166. }
  167. }
  168. return c.conn.SetReadDeadline(t)
  169. }
  170. func (c *connection) SetWriteDeadline(t time.Time) error {
  171. if c.shouldWait {
  172. <-c.delayedDialFinish.Done()
  173. if c.conn == nil {
  174. newError("websocket transport is not materialized when SetWriteDeadline() is called").AtWarning().WriteToLog()
  175. return nil
  176. }
  177. }
  178. return c.conn.SetWriteDeadline(t)
  179. }