connection.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package httpupgrade
  2. import (
  3. "context"
  4. "github.com/v2fly/v2ray-core/v5/common/buf"
  5. "github.com/v2fly/v2ray-core/v5/common/net"
  6. "github.com/v2fly/v2ray-core/v5/common/serial"
  7. "io"
  8. "time"
  9. )
  10. type connection struct {
  11. conn net.Conn
  12. reader io.Reader
  13. remoteAddr net.Addr
  14. shouldWait bool
  15. delayedDialFinish context.Context
  16. finishedDial context.CancelFunc
  17. dialer delayedDialer
  18. }
  19. type delayedDialer func(earlyData []byte) (conn net.Conn, earlyReply io.Reader, err error)
  20. func newConnectionWithPendingRead(conn net.Conn, remoteAddr net.Addr, earlyReplyReader io.Reader) *connection {
  21. return &connection{
  22. conn: conn,
  23. remoteAddr: remoteAddr,
  24. reader: earlyReplyReader,
  25. }
  26. }
  27. func newConnectionWithDelayedDial(dialer delayedDialer) *connection {
  28. ctx, cancel := context.WithCancel(context.Background())
  29. return &connection{
  30. shouldWait: true,
  31. delayedDialFinish: ctx,
  32. finishedDial: cancel,
  33. dialer: dialer,
  34. }
  35. }
  36. // Read implements net.Conn.Read()
  37. func (c *connection) Read(b []byte) (int, error) {
  38. if c.shouldWait {
  39. <-c.delayedDialFinish.Done()
  40. if c.conn == nil {
  41. return 0, newError("unable to read delayed dial websocket connection as it do not exist")
  42. }
  43. }
  44. if c.reader != nil {
  45. n, err := c.reader.Read(b)
  46. if err == io.EOF {
  47. c.reader = nil
  48. return c.conn.Read(b)
  49. }
  50. return n, err
  51. }
  52. return c.conn.Read(b)
  53. }
  54. // Write implements io.Writer.
  55. func (c *connection) Write(b []byte) (int, error) {
  56. if c.shouldWait {
  57. var err error
  58. var earlyReply io.Reader
  59. c.conn, earlyReply, err = c.dialer(b)
  60. if earlyReply != nil {
  61. c.reader = earlyReply
  62. }
  63. c.finishedDial()
  64. if err != nil {
  65. return 0, newError("Unable to proceed with delayed write").Base(err)
  66. }
  67. c.remoteAddr = c.conn.RemoteAddr()
  68. c.shouldWait = false
  69. return len(b), nil
  70. }
  71. return c.conn.Write(b)
  72. }
  73. func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
  74. mb = buf.Compact(mb)
  75. mb, err := buf.WriteMultiBuffer(c, mb)
  76. buf.ReleaseMulti(mb)
  77. return err
  78. }
  79. func (c *connection) Close() error {
  80. if c.shouldWait {
  81. <-c.delayedDialFinish.Done()
  82. if c.conn == nil {
  83. return newError("unable to close delayed dial websocket connection as it do not exist")
  84. }
  85. }
  86. var closeErrors []interface{}
  87. if err := c.conn.Close(); err != nil {
  88. closeErrors = append(closeErrors, err)
  89. }
  90. if len(closeErrors) > 0 {
  91. return newError("failed to close connection").Base(newError(serial.Concat(closeErrors...)))
  92. }
  93. return nil
  94. }
  95. func (c *connection) LocalAddr() net.Addr {
  96. if c.shouldWait {
  97. <-c.delayedDialFinish.Done()
  98. if c.conn == nil {
  99. newError("websocket transport is not materialized when LocalAddr() is called").AtWarning().WriteToLog()
  100. return &net.UnixAddr{
  101. Name: "@placeholder",
  102. Net: "unix",
  103. }
  104. }
  105. }
  106. return c.conn.LocalAddr()
  107. }
  108. func (c *connection) RemoteAddr() net.Addr {
  109. return c.remoteAddr
  110. }
  111. func (c *connection) SetDeadline(t time.Time) error {
  112. if err := c.SetReadDeadline(t); err != nil {
  113. return err
  114. }
  115. return c.SetWriteDeadline(t)
  116. }
  117. func (c *connection) SetReadDeadline(t time.Time) error {
  118. if c.shouldWait {
  119. <-c.delayedDialFinish.Done()
  120. if c.conn == nil {
  121. newError("httpupgrade transport is not materialized when SetReadDeadline() is called").AtWarning().WriteToLog()
  122. return nil
  123. }
  124. }
  125. return c.conn.SetReadDeadline(t)
  126. }
  127. func (c *connection) SetWriteDeadline(t time.Time) error {
  128. if c.shouldWait {
  129. <-c.delayedDialFinish.Done()
  130. if c.conn == nil {
  131. newError("httpupgrade transport is not materialized when SetWriteDeadline() is called").AtWarning().WriteToLog()
  132. return nil
  133. }
  134. }
  135. return c.conn.SetWriteDeadline(t)
  136. }