hub.go 5.2 KB


  1. //go:build !confonly
  2. // +build !confonly
  3. package http
  4. import (
  5. "context"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "golang.org/x/net/http2"
  11. "golang.org/x/net/http2/h2c"
  12. "github.com/v2fly/v2ray-core/v4/common"
  13. "github.com/v2fly/v2ray-core/v4/common/net"
  14. http_proto "github.com/v2fly/v2ray-core/v4/common/protocol/http"
  15. "github.com/v2fly/v2ray-core/v4/common/serial"
  16. "github.com/v2fly/v2ray-core/v4/common/session"
  17. "github.com/v2fly/v2ray-core/v4/common/signal/done"
  18. "github.com/v2fly/v2ray-core/v4/transport/internet"
  19. "github.com/v2fly/v2ray-core/v4/transport/internet/tls"
  20. )
  21. type Listener struct {
  22. server *http.Server
  23. handler internet.ConnHandler
  24. local net.Addr
  25. config *Config
  26. locker *internet.FileLocker // for unix domain socket
  27. }
  28. func (l *Listener) Addr() net.Addr {
  29. return l.local
  30. }
  31. func (l *Listener) Close() error {
  32. if l.locker != nil {
  33. l.locker.Release()
  34. }
  35. return l.server.Close()
  36. }
  37. type flushWriter struct {
  38. w io.Writer
  39. d *done.Instance
  40. }
  41. func (fw flushWriter) Write(p []byte) (n int, err error) {
  42. if fw.d.Done() {
  43. return 0, io.ErrClosedPipe
  44. }
  45. n, err = fw.w.Write(p)
  46. if f, ok := fw.w.(http.Flusher); ok {
  47. f.Flush()
  48. }
  49. return
  50. }
  51. func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  52. host := request.Host
  53. if !l.config.isValidHost(host) {
  54. writer.WriteHeader(404)
  55. return
  56. }
  57. path := l.config.getNormalizedPath()
  58. if !strings.HasPrefix(request.URL.Path, path) {
  59. writer.WriteHeader(404)
  60. return
  61. }
  62. writer.Header().Set("Cache-Control", "no-store")
  63. for _, httpHeader := range l.config.Header {
  64. for _, httpHeaderValue := range httpHeader.Value {
  65. writer.Header().Set(httpHeader.Name, httpHeaderValue)
  66. }
  67. }
  68. writer.WriteHeader(200)
  69. if f, ok := writer.(http.Flusher); ok {
  70. f.Flush()
  71. }
  72. remoteAddr := l.Addr()
  73. dest, err := net.ParseDestination(request.RemoteAddr)
  74. if err != nil {
  75. newError("failed to parse request remote addr: ", request.RemoteAddr).Base(err).WriteToLog()
  76. } else {
  77. remoteAddr = &net.TCPAddr{
  78. IP: dest.Address.IP(),
  79. Port: int(dest.Port),
  80. }
  81. }
  82. forwardedAddress := http_proto.ParseXForwardedFor(request.Header)
  83. if len(forwardedAddress) > 0 && forwardedAddress[0].Family().IsIP() {
  84. remoteAddr = &net.TCPAddr{
  85. IP: forwardedAddress[0].IP(),
  86. Port: 0,
  87. }
  88. }
  89. done := done.New()
  90. conn := net.NewConnection(
  91. net.ConnectionOutput(request.Body),
  92. net.ConnectionInput(flushWriter{w: writer, d: done}),
  93. net.ConnectionOnClose(common.ChainedClosable{done, request.Body}),
  94. net.ConnectionLocalAddr(l.Addr()),
  95. net.ConnectionRemoteAddr(remoteAddr),
  96. )
  97. l.handler(conn)
  98. <-done.Wait()
  99. }
  100. func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  101. httpSettings := streamSettings.ProtocolSettings.(*Config)
  102. var listener *Listener
  103. if port == net.Port(0) { // unix
  104. listener = &Listener{
  105. handler: handler,
  106. local: &net.UnixAddr{
  107. Name: address.Domain(),
  108. Net: "unix",
  109. },
  110. config: httpSettings,
  111. }
  112. } else { // tcp
  113. listener = &Listener{
  114. handler: handler,
  115. local: &net.TCPAddr{
  116. IP: address.IP(),
  117. Port: int(port),
  118. },
  119. config: httpSettings,
  120. }
  121. }
  122. var server *http.Server
  123. config := tls.ConfigFromStreamSettings(streamSettings)
  124. if config == nil {
  125. h2s := &http2.Server{}
  126. server = &http.Server{
  127. Addr: serial.Concat(address, ":", port),
  128. Handler: h2c.NewHandler(listener, h2s),
  129. ReadHeaderTimeout: time.Second * 4,
  130. }
  131. } else {
  132. server = &http.Server{
  133. Addr: serial.Concat(address, ":", port),
  134. TLSConfig: config.GetTLSConfig(tls.WithNextProto("h2")),
  135. Handler: listener,
  136. ReadHeaderTimeout: time.Second * 4,
  137. }
  138. }
  139. if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.AcceptProxyProtocol {
  140. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  141. }
  142. listener.server = server
  143. go func() {
  144. var streamListener net.Listener
  145. var err error
  146. if port == net.Port(0) { // unix
  147. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  148. Name: address.Domain(),
  149. Net: "unix",
  150. }, streamSettings.SocketSettings)
  151. if err != nil {
  152. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  153. return
  154. }
  155. locker := ctx.Value(address.Domain())
  156. if locker != nil {
  157. listener.locker = locker.(*internet.FileLocker)
  158. }
  159. } else { // tcp
  160. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  161. IP: address.IP(),
  162. Port: int(port),
  163. }, streamSettings.SocketSettings)
  164. if err != nil {
  165. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  166. return
  167. }
  168. }
  169. if config == nil {
  170. err = server.Serve(streamListener)
  171. if err != nil {
  172. newError("stopping serving H2C").Base(err).WriteToLog(session.ExportIDToError(ctx))
  173. }
  174. } else {
  175. err = server.ServeTLS(streamListener, "", "")
  176. if err != nil {
  177. newError("stopping serving TLS").Base(err).WriteToLog(session.ExportIDToError(ctx))
  178. }
  179. }
  180. }()
  181. return listener, nil
  182. }
  183. func init() {
  184. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  185. }