hub.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // +build !confonly
  2. package http
  3. import (
  4. "context"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "time"
  9. "golang.org/x/net/http2"
  10. "golang.org/x/net/http2/h2c"
  11. "github.com/v2fly/v2ray-core/v4/common"
  12. "github.com/v2fly/v2ray-core/v4/common/net"
  13. http_proto "github.com/v2fly/v2ray-core/v4/common/protocol/http"
  14. "github.com/v2fly/v2ray-core/v4/common/serial"
  15. "github.com/v2fly/v2ray-core/v4/common/session"
  16. "github.com/v2fly/v2ray-core/v4/common/signal/done"
  17. "github.com/v2fly/v2ray-core/v4/transport/internet"
  18. "github.com/v2fly/v2ray-core/v4/transport/internet/tls"
  19. )
  20. type Listener struct {
  21. server *http.Server
  22. handler internet.ConnHandler
  23. local net.Addr
  24. config *Config
  25. locker *internet.FileLocker // for unix domain socket
  26. }
  27. func (l *Listener) Addr() net.Addr {
  28. return l.local
  29. }
  30. func (l *Listener) Close() error {
  31. if l.locker != nil {
  32. l.locker.Release()
  33. }
  34. return l.server.Close()
  35. }
  36. type flushWriter struct {
  37. w io.Writer
  38. d *done.Instance
  39. }
  40. func (fw flushWriter) Write(p []byte) (n int, err error) {
  41. if fw.d.Done() {
  42. return 0, io.ErrClosedPipe
  43. }
  44. n, err = fw.w.Write(p)
  45. if f, ok := fw.w.(http.Flusher); ok {
  46. f.Flush()
  47. }
  48. return
  49. }
  50. func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  51. host := request.Host
  52. if !l.config.isValidHost(host) {
  53. writer.WriteHeader(404)
  54. return
  55. }
  56. path := l.config.getNormalizedPath()
  57. if !strings.HasPrefix(request.URL.Path, path) {
  58. writer.WriteHeader(404)
  59. return
  60. }
  61. writer.Header().Set("Cache-Control", "no-store")
  62. for _, httpHeader := range l.config.Header {
  63. for _, httpHeaderValue := range httpHeader.Value {
  64. writer.Header().Set(httpHeader.Name, httpHeaderValue)
  65. }
  66. }
  67. writer.WriteHeader(200)
  68. if f, ok := writer.(http.Flusher); ok {
  69. f.Flush()
  70. }
  71. remoteAddr := l.Addr()
  72. dest, err := net.ParseDestination(request.RemoteAddr)
  73. if err != nil {
  74. newError("failed to parse request remote addr: ", request.RemoteAddr).Base(err).WriteToLog()
  75. } else {
  76. remoteAddr = &net.TCPAddr{
  77. IP: dest.Address.IP(),
  78. Port: int(dest.Port),
  79. }
  80. }
  81. forwardedAddress := http_proto.ParseXForwardedFor(request.Header)
  82. if len(forwardedAddress) > 0 && forwardedAddress[0].Family().IsIP() {
  83. remoteAddr = &net.TCPAddr{
  84. IP: forwardedAddress[0].IP(),
  85. Port: 0,
  86. }
  87. }
  88. done := done.New()
  89. conn := net.NewConnection(
  90. net.ConnectionOutput(request.Body),
  91. net.ConnectionInput(flushWriter{w: writer, d: done}),
  92. net.ConnectionOnClose(common.ChainedClosable{done, request.Body}),
  93. net.ConnectionLocalAddr(l.Addr()),
  94. net.ConnectionRemoteAddr(remoteAddr),
  95. )
  96. l.handler(conn)
  97. <-done.Wait()
  98. }
  99. func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  100. httpSettings := streamSettings.ProtocolSettings.(*Config)
  101. var listener *Listener
  102. if port == net.Port(0) { // unix
  103. listener = &Listener{
  104. handler: handler,
  105. local: &net.UnixAddr{
  106. Name: address.Domain(),
  107. Net: "unix",
  108. },
  109. config: httpSettings,
  110. }
  111. } else { // tcp
  112. listener = &Listener{
  113. handler: handler,
  114. local: &net.TCPAddr{
  115. IP: address.IP(),
  116. Port: int(port),
  117. },
  118. config: httpSettings,
  119. }
  120. }
  121. var server *http.Server
  122. config := tls.ConfigFromStreamSettings(streamSettings)
  123. if config == nil {
  124. h2s := &http2.Server{}
  125. server = &http.Server{
  126. Addr: serial.Concat(address, ":", port),
  127. Handler: h2c.NewHandler(listener, h2s),
  128. ReadHeaderTimeout: time.Second * 4,
  129. }
  130. } else {
  131. server = &http.Server{
  132. Addr: serial.Concat(address, ":", port),
  133. TLSConfig: config.GetTLSConfig(tls.WithNextProto("h2")),
  134. Handler: listener,
  135. ReadHeaderTimeout: time.Second * 4,
  136. }
  137. }
  138. if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.AcceptProxyProtocol {
  139. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  140. }
  141. listener.server = server
  142. go func() {
  143. var streamListener net.Listener
  144. var err error
  145. if port == net.Port(0) { // unix
  146. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  147. Name: address.Domain(),
  148. Net: "unix",
  149. }, streamSettings.SocketSettings)
  150. if err != nil {
  151. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  152. return
  153. }
  154. locker := ctx.Value(address.Domain())
  155. if locker != nil {
  156. listener.locker = locker.(*internet.FileLocker)
  157. }
  158. } else { // tcp
  159. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  160. IP: address.IP(),
  161. Port: int(port),
  162. }, streamSettings.SocketSettings)
  163. if err != nil {
  164. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  165. return
  166. }
  167. }
  168. if config == nil {
  169. err = server.Serve(streamListener)
  170. if err != nil {
  171. newError("stopping serving H2C").Base(err).WriteToLog(session.ExportIDToError(ctx))
  172. }
  173. } else {
  174. err = server.ServeTLS(streamListener, "", "")
  175. if err != nil {
  176. newError("stopping serving TLS").Base(err).WriteToLog(session.ExportIDToError(ctx))
  177. }
  178. }
  179. }()
  180. return listener, nil
  181. }
  182. func init() {
  183. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  184. }