dialer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package httpupgrade
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/base64"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/v2fly/v2ray-core/v5/common"
  10. "github.com/v2fly/v2ray-core/v5/common/net"
  11. "github.com/v2fly/v2ray-core/v5/common/session"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet"
  13. "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon"
  14. )
  15. func dialhttpUpgrade(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
  16. transportConfiguration := streamSettings.ProtocolSettings.(*Config)
  17. dialer := func(earlyData []byte) (net.Conn, io.Reader, error) {
  18. conn, err := transportcommon.DialWithSecuritySettings(ctx, dest, streamSettings)
  19. if err != nil {
  20. return nil, nil, newError("failed to dial request to ", dest).Base(err)
  21. }
  22. req, err := http.NewRequest("GET", transportConfiguration.GetNormalizedPath(), nil)
  23. if err != nil {
  24. return nil, nil, err
  25. }
  26. req.Header.Set("Connection", "upgrade")
  27. req.Header.Set("Upgrade", "websocket")
  28. req.Host = transportConfiguration.Host
  29. if transportConfiguration.Header != nil {
  30. for _, value := range transportConfiguration.Header {
  31. req.Header.Set(value.Key, value.Value)
  32. }
  33. }
  34. earlyDataSize := len(earlyData)
  35. if earlyDataSize > int(transportConfiguration.MaxEarlyData) {
  36. earlyDataSize = int(transportConfiguration.MaxEarlyData)
  37. }
  38. if earlyData != nil && len(earlyData) > 0 {
  39. if transportConfiguration.EarlyDataHeaderName == "" {
  40. return nil, nil, newError("EarlyDataHeaderName is not set")
  41. }
  42. req.Header.Set(transportConfiguration.EarlyDataHeaderName, base64.URLEncoding.EncodeToString(earlyData))
  43. }
  44. err = req.Write(conn)
  45. if err != nil {
  46. return nil, nil, err
  47. }
  48. if earlyData != nil && len(earlyData[earlyDataSize:]) > 0 {
  49. _, err = conn.Write(earlyData[earlyDataSize:])
  50. if err != nil {
  51. return nil, nil, newError("failed to finish write early data").Base(err)
  52. }
  53. }
  54. bufferedConn := bufio.NewReader(conn)
  55. resp, err := http.ReadResponse(bufferedConn, req) // nolint:bodyclose
  56. if err != nil {
  57. return nil, nil, err
  58. }
  59. if resp.Status == "101 Switching Protocols" &&
  60. strings.ToLower(resp.Header.Get("Upgrade")) == "websocket" &&
  61. strings.ToLower(resp.Header.Get("Connection")) == "upgrade" {
  62. earlyReplyReader := io.LimitReader(bufferedConn, int64(bufferedConn.Buffered()))
  63. return conn, earlyReplyReader, nil
  64. }
  65. return nil, nil, newError("unrecognized reply")
  66. }
  67. if transportConfiguration.MaxEarlyData == 0 {
  68. conn, earlyReplyReader, err := dialer(nil)
  69. if err != nil {
  70. return nil, err
  71. }
  72. remoteAddr := conn.RemoteAddr()
  73. return newConnectionWithPendingRead(conn, remoteAddr, earlyReplyReader), nil
  74. }
  75. return newConnectionWithDelayedDial(dialer), nil
  76. }
  77. func dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
  78. newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
  79. conn, err := dialhttpUpgrade(ctx, dest, streamSettings)
  80. if err != nil {
  81. return nil, newError("failed to dial request to ", dest).Base(err)
  82. }
  83. return internet.Connection(conn), nil
  84. }
  85. func init() {
  86. common.Must(internet.RegisterTransportDialer(protocolName, dial))
  87. }