hub.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package httpupgrade
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/base64"
  7. "net/http"
  8. "strings"
  9. "github.com/v2fly/v2ray-core/v5/common"
  10. "github.com/v2fly/v2ray-core/v5/common/net"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet"
  12. "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon"
  13. )
  14. type server struct {
  15. config *Config
  16. addConn internet.ConnHandler
  17. innnerListener net.Listener
  18. }
  19. func (s *server) Close() error {
  20. return s.innnerListener.Close()
  21. }
  22. func (s *server) Addr() net.Addr {
  23. return nil
  24. }
  25. func (s *server) Handle(conn net.Conn) (internet.Connection, error) {
  26. connReader := bufio.NewReader(conn)
  27. req, err := http.ReadRequest(connReader)
  28. if err != nil {
  29. return nil, err
  30. }
  31. connection := strings.ToLower(req.Header.Get("Connection"))
  32. upgrade := strings.ToLower(req.Header.Get("Upgrade"))
  33. if connection != "upgrade" || upgrade != "websocket" {
  34. _ = conn.Close()
  35. return nil, newError("unrecognized request")
  36. }
  37. resp := &http.Response{
  38. Status: "101 Switching Protocols",
  39. StatusCode: 101,
  40. Proto: "HTTP/1.1",
  41. ProtoMajor: 1,
  42. ProtoMinor: 1,
  43. Header: http.Header{},
  44. }
  45. resp.Header.Set("Connection", "upgrade")
  46. resp.Header.Set("Upgrade", "websocket")
  47. err = resp.Write(conn)
  48. if err != nil {
  49. _ = conn.Close()
  50. return nil, err
  51. }
  52. if s.config.MaxEarlyData != 0 {
  53. if s.config.EarlyDataHeaderName == "" {
  54. return nil, newError("EarlyDataHeaderName is not set")
  55. }
  56. earlyData := req.Header.Get(s.config.EarlyDataHeaderName)
  57. if earlyData != "" {
  58. earlyDataBytes, err := base64.URLEncoding.DecodeString(earlyData)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return newConnectionWithPendingRead(conn, conn.RemoteAddr(), bytes.NewReader(earlyDataBytes)), nil
  63. }
  64. }
  65. return internet.Connection(conn), nil
  66. }
  67. func (s *server) keepAccepting() {
  68. for {
  69. conn, err := s.innnerListener.Accept()
  70. if err != nil {
  71. return
  72. }
  73. handledConn, err := s.Handle(conn)
  74. if err != nil {
  75. newError("failed to handle request").Base(err).WriteToLog()
  76. continue
  77. }
  78. s.addConn(handledConn)
  79. }
  80. }
  81. func listenHTTPUpgrade(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
  82. transportConfiguration := streamSettings.ProtocolSettings.(*Config)
  83. serverInstance := &server{config: transportConfiguration, addConn: addConn}
  84. listener, err := transportcommon.ListenWithSecuritySettings(ctx, address, port, streamSettings)
  85. if err != nil {
  86. return nil, newError("failed to listen on ", address, ":", port).Base(err)
  87. }
  88. serverInstance.innnerListener = listener
  89. go serverInstance.keepAccepting()
  90. return serverInstance, nil
  91. }
  92. func init() {
  93. common.Must(internet.RegisterTransportListener(protocolName, listenHTTPUpgrade))
  94. }