hub.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package http
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "strings"
  7. "time"
  8. "v2ray.com/core/common"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/serial"
  11. "v2ray.com/core/common/session"
  12. "v2ray.com/core/common/signal/done"
  13. "v2ray.com/core/transport/internet"
  14. "v2ray.com/core/transport/internet/tls"
  15. )
  16. type Listener struct {
  17. server *http.Server
  18. handler internet.ConnHandler
  19. local net.Addr
  20. config Config
  21. }
  22. func (l *Listener) Addr() net.Addr {
  23. return l.local
  24. }
  25. func (l *Listener) Close() error {
  26. return l.server.Close()
  27. }
  28. type flushWriter struct {
  29. w io.Writer
  30. d *done.Instance
  31. }
  32. func (fw flushWriter) Write(p []byte) (n int, err error) {
  33. if fw.d.Done() {
  34. return 0, io.ErrClosedPipe
  35. }
  36. n, err = fw.w.Write(p)
  37. if f, ok := fw.w.(http.Flusher); ok {
  38. f.Flush()
  39. }
  40. return
  41. }
  42. func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  43. host := request.Host
  44. if !l.config.isValidHost(host) {
  45. writer.WriteHeader(404)
  46. return
  47. }
  48. path := l.config.getNormalizedPath()
  49. if !strings.HasPrefix(request.URL.Path, path) {
  50. writer.WriteHeader(404)
  51. return
  52. }
  53. writer.Header().Set("Cache-Control", "no-store")
  54. writer.WriteHeader(200)
  55. if f, ok := writer.(http.Flusher); ok {
  56. f.Flush()
  57. }
  58. remoteAddr := l.Addr()
  59. dest, err := net.ParseDestination(request.RemoteAddr)
  60. if err != nil {
  61. newError("failed to parse request remote addr: ", request.RemoteAddr).Base(err).WriteToLog()
  62. } else {
  63. remoteAddr = &net.TCPAddr{
  64. IP: dest.Address.IP(),
  65. Port: int(dest.Port),
  66. }
  67. }
  68. done := done.New()
  69. conn := net.NewConnection(
  70. net.ConnectionOutput(request.Body),
  71. net.ConnectionInput(flushWriter{w: writer, d: done}),
  72. net.ConnectionOnClose(common.ChainedClosable{done, request.Body}),
  73. net.ConnectionLocalAddr(l.Addr()),
  74. net.ConnectionRemoteAddr(remoteAddr),
  75. )
  76. l.handler(conn)
  77. <-done.Wait()
  78. }
  79. func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  80. httpSettings := streamSettings.ProtocolSettings.(*Config)
  81. listener := &Listener{
  82. handler: handler,
  83. local: &net.TCPAddr{
  84. IP: address.IP(),
  85. Port: int(port),
  86. },
  87. config: *httpSettings,
  88. }
  89. config := tls.ConfigFromStreamSettings(streamSettings)
  90. if config == nil {
  91. return nil, newError("TLS must be enabled for http transport.").AtWarning()
  92. }
  93. server := &http.Server{
  94. Addr: serial.Concat(address, ":", port),
  95. TLSConfig: config.GetTLSConfig(tls.WithNextProto("h2")),
  96. Handler: listener,
  97. ReadHeaderTimeout: time.Second * 4,
  98. }
  99. listener.server = server
  100. go func() {
  101. tcpListener, err := internet.ListenSystem(ctx, &net.TCPAddr{
  102. IP: address.IP(),
  103. Port: int(port),
  104. }, streamSettings.SocketSettings)
  105. if err != nil {
  106. newError("failed to listen on", address, ":", port).Base(err).WriteToLog(session.ExportIDToError(ctx))
  107. return
  108. }
  109. err = server.ServeTLS(tcpListener, "", "")
  110. if err != nil {
  111. newError("stoping serving TLS").Base(err).WriteToLog(session.ExportIDToError(ctx))
  112. }
  113. }()
  114. return listener, nil
  115. }
  116. func init() {
  117. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  118. }