server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package h2quic
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. quic "github.com/lucas-clemente/quic-go"
  14. "github.com/lucas-clemente/quic-go/internal/protocol"
  15. "github.com/lucas-clemente/quic-go/internal/utils"
  16. "github.com/lucas-clemente/quic-go/qerr"
  17. "golang.org/x/net/http2"
  18. "golang.org/x/net/http2/hpack"
  19. )
  20. type streamCreator interface {
  21. quic.Session
  22. GetOrOpenStream(protocol.StreamID) (quic.Stream, error)
  23. }
  24. type remoteCloser interface {
  25. CloseRemote(protocol.ByteCount)
  26. }
  27. // allows mocking of quic.Listen and quic.ListenAddr
  28. var (
  29. quicListen = quic.Listen
  30. quicListenAddr = quic.ListenAddr
  31. )
  32. // Server is a HTTP2 server listening for QUIC connections.
  33. type Server struct {
  34. *http.Server
  35. // By providing a quic.Config, it is possible to set parameters of the QUIC connection.
  36. // If nil, it uses reasonable default values.
  37. QuicConfig *quic.Config
  38. // Private flag for demo, do not use
  39. CloseAfterFirstRequest bool
  40. port uint32 // used atomically
  41. listenerMutex sync.Mutex
  42. listener quic.Listener
  43. closed bool
  44. supportedVersionsAsString string
  45. logger utils.Logger // will be set by Server.serveImpl()
  46. }
  47. // ListenAndServe listens on the UDP address s.Addr and calls s.Handler to handle HTTP/2 requests on incoming connections.
  48. func (s *Server) ListenAndServe() error {
  49. if s.Server == nil {
  50. return errors.New("use of h2quic.Server without http.Server")
  51. }
  52. return s.serveImpl(s.TLSConfig, nil)
  53. }
  54. // ListenAndServeTLS listens on the UDP address s.Addr and calls s.Handler to handle HTTP/2 requests on incoming connections.
  55. func (s *Server) ListenAndServeTLS(certFile, keyFile string) error {
  56. var err error
  57. certs := make([]tls.Certificate, 1)
  58. certs[0], err = tls.LoadX509KeyPair(certFile, keyFile)
  59. if err != nil {
  60. return err
  61. }
  62. // We currently only use the cert-related stuff from tls.Config,
  63. // so we don't need to make a full copy.
  64. config := &tls.Config{
  65. Certificates: certs,
  66. }
  67. return s.serveImpl(config, nil)
  68. }
  69. // Serve an existing UDP connection.
  70. func (s *Server) Serve(conn net.PacketConn) error {
  71. return s.serveImpl(s.TLSConfig, conn)
  72. }
  73. func (s *Server) serveImpl(tlsConfig *tls.Config, conn net.PacketConn) error {
  74. if s.Server == nil {
  75. return errors.New("use of h2quic.Server without http.Server")
  76. }
  77. s.logger = utils.DefaultLogger.WithPrefix("server")
  78. s.listenerMutex.Lock()
  79. if s.closed {
  80. s.listenerMutex.Unlock()
  81. return errors.New("Server is already closed")
  82. }
  83. if s.listener != nil {
  84. s.listenerMutex.Unlock()
  85. return errors.New("ListenAndServe may only be called once")
  86. }
  87. var ln quic.Listener
  88. var err error
  89. if conn == nil {
  90. ln, err = quicListenAddr(s.Addr, tlsConfig, s.QuicConfig)
  91. } else {
  92. ln, err = quicListen(conn, tlsConfig, s.QuicConfig)
  93. }
  94. if err != nil {
  95. s.listenerMutex.Unlock()
  96. return err
  97. }
  98. s.listener = ln
  99. s.listenerMutex.Unlock()
  100. for {
  101. sess, err := ln.Accept()
  102. if err != nil {
  103. return err
  104. }
  105. go s.handleHeaderStream(sess.(streamCreator))
  106. }
  107. }
  108. func (s *Server) handleHeaderStream(session streamCreator) {
  109. stream, err := session.AcceptStream()
  110. if err != nil {
  111. session.CloseWithError(quic.ErrorCode(qerr.InvalidHeadersStreamData), err)
  112. return
  113. }
  114. hpackDecoder := hpack.NewDecoder(4096, nil)
  115. h2framer := http2.NewFramer(nil, stream)
  116. var headerStreamMutex sync.Mutex // Protects concurrent calls to Write()
  117. for {
  118. if err := s.handleRequest(session, stream, &headerStreamMutex, hpackDecoder, h2framer); err != nil {
  119. // QuicErrors must originate from stream.Read() returning an error.
  120. // In this case, the session has already logged the error, so we don't
  121. // need to log it again.
  122. errorCode := qerr.InternalError
  123. if qerr, ok := err.(*qerr.QuicError); !ok {
  124. errorCode = qerr.ErrorCode
  125. s.logger.Errorf("error handling h2 request: %s", err.Error())
  126. }
  127. session.CloseWithError(quic.ErrorCode(errorCode), err)
  128. return
  129. }
  130. }
  131. }
  132. func (s *Server) handleRequest(session streamCreator, headerStream quic.Stream, headerStreamMutex *sync.Mutex, hpackDecoder *hpack.Decoder, h2framer *http2.Framer) error {
  133. h2frame, err := h2framer.ReadFrame()
  134. if err != nil {
  135. return qerr.Error(qerr.HeadersStreamDataDecompressFailure, "cannot read frame")
  136. }
  137. var h2headersFrame *http2.HeadersFrame
  138. switch f := h2frame.(type) {
  139. case *http2.PriorityFrame:
  140. // ignore PRIORITY frames
  141. s.logger.Debugf("Ignoring H2 PRIORITY frame: %#v", f)
  142. return nil
  143. case *http2.HeadersFrame:
  144. h2headersFrame = f
  145. default:
  146. return qerr.Error(qerr.InvalidHeadersStreamData, "expected a header frame")
  147. }
  148. if !h2headersFrame.HeadersEnded() {
  149. return errors.New("http2 header continuation not implemented")
  150. }
  151. headers, err := hpackDecoder.DecodeFull(h2headersFrame.HeaderBlockFragment())
  152. if err != nil {
  153. s.logger.Errorf("invalid http2 headers encoding: %s", err.Error())
  154. return err
  155. }
  156. req, err := requestFromHeaders(headers)
  157. if err != nil {
  158. return err
  159. }
  160. if s.logger.Debug() {
  161. s.logger.Infof("%s %s%s, on data stream %d", req.Method, req.Host, req.RequestURI, h2headersFrame.StreamID)
  162. } else {
  163. s.logger.Infof("%s %s%s", req.Method, req.Host, req.RequestURI)
  164. }
  165. dataStream, err := session.GetOrOpenStream(protocol.StreamID(h2headersFrame.StreamID))
  166. if err != nil {
  167. return err
  168. }
  169. // this can happen if the client immediately closes the data stream after sending the request and the runtime processes the reset before the request
  170. if dataStream == nil {
  171. return nil
  172. }
  173. // handleRequest should be as non-blocking as possible to minimize
  174. // head-of-line blocking. Potentially blocking code is run in a separate
  175. // goroutine, enabling handleRequest to return before the code is executed.
  176. go func() {
  177. streamEnded := h2headersFrame.StreamEnded()
  178. if streamEnded {
  179. dataStream.(remoteCloser).CloseRemote(0)
  180. streamEnded = true
  181. _, _ = dataStream.Read([]byte{0}) // read the eof
  182. }
  183. req = req.WithContext(dataStream.Context())
  184. reqBody := newRequestBody(dataStream)
  185. req.Body = reqBody
  186. req.RemoteAddr = session.RemoteAddr().String()
  187. responseWriter := newResponseWriter(headerStream, headerStreamMutex, dataStream, protocol.StreamID(h2headersFrame.StreamID), s.logger)
  188. handler := s.Handler
  189. if handler == nil {
  190. handler = http.DefaultServeMux
  191. }
  192. panicked := false
  193. func() {
  194. defer func() {
  195. if p := recover(); p != nil {
  196. // Copied from net/http/server.go
  197. const size = 64 << 10
  198. buf := make([]byte, size)
  199. buf = buf[:runtime.Stack(buf, false)]
  200. s.logger.Errorf("http: panic serving: %v\n%s", p, buf)
  201. panicked = true
  202. }
  203. }()
  204. handler.ServeHTTP(responseWriter, req)
  205. }()
  206. if panicked {
  207. responseWriter.WriteHeader(500)
  208. } else {
  209. responseWriter.WriteHeader(200)
  210. }
  211. if responseWriter.dataStream != nil {
  212. if !streamEnded && !reqBody.requestRead {
  213. // in gQUIC, the error code doesn't matter, so just use 0 here
  214. responseWriter.dataStream.CancelRead(0)
  215. }
  216. responseWriter.dataStream.Close()
  217. }
  218. if s.CloseAfterFirstRequest {
  219. time.Sleep(100 * time.Millisecond)
  220. session.Close()
  221. }
  222. }()
  223. return nil
  224. }
  225. // Close the server immediately, aborting requests and sending CONNECTION_CLOSE frames to connected clients.
  226. // Close in combination with ListenAndServe() (instead of Serve()) may race if it is called before a UDP socket is established.
  227. func (s *Server) Close() error {
  228. s.listenerMutex.Lock()
  229. defer s.listenerMutex.Unlock()
  230. s.closed = true
  231. if s.listener != nil {
  232. err := s.listener.Close()
  233. s.listener = nil
  234. return err
  235. }
  236. return nil
  237. }
  238. // CloseGracefully shuts down the server gracefully. The server sends a GOAWAY frame first, then waits for either timeout to trigger, or for all running requests to complete.
  239. // CloseGracefully in combination with ListenAndServe() (instead of Serve()) may race if it is called before a UDP socket is established.
  240. func (s *Server) CloseGracefully(timeout time.Duration) error {
  241. // TODO: implement
  242. return nil
  243. }
  244. // SetQuicHeaders can be used to set the proper headers that announce that this server supports QUIC.
  245. // The values that are set depend on the port information from s.Server.Addr, and currently look like this (if Addr has port 443):
  246. // Alt-Svc: quic=":443"; ma=2592000; v="33,32,31,30"
  247. func (s *Server) SetQuicHeaders(hdr http.Header) error {
  248. port := atomic.LoadUint32(&s.port)
  249. if port == 0 {
  250. // Extract port from s.Server.Addr
  251. _, portStr, err := net.SplitHostPort(s.Server.Addr)
  252. if err != nil {
  253. return err
  254. }
  255. portInt, err := net.LookupPort("tcp", portStr)
  256. if err != nil {
  257. return err
  258. }
  259. port = uint32(portInt)
  260. atomic.StoreUint32(&s.port, port)
  261. }
  262. if s.supportedVersionsAsString == "" {
  263. var versions []string
  264. for _, v := range protocol.SupportedVersions {
  265. versions = append(versions, v.ToAltSvc())
  266. }
  267. s.supportedVersionsAsString = strings.Join(versions, ",")
  268. }
  269. hdr.Add("Alt-Svc", fmt.Sprintf(`quic=":%d"; ma=2592000; v="%s"`, port, s.supportedVersionsAsString))
  270. return nil
  271. }
  272. // ListenAndServeQUIC listens on the UDP network address addr and calls the
  273. // handler for HTTP/2 requests on incoming connections. http.DefaultServeMux is
  274. // used when handler is nil.
  275. func ListenAndServeQUIC(addr, certFile, keyFile string, handler http.Handler) error {
  276. server := &Server{
  277. Server: &http.Server{
  278. Addr: addr,
  279. Handler: handler,
  280. },
  281. }
  282. return server.ListenAndServeTLS(certFile, keyFile)
  283. }
  284. // ListenAndServe listens on the given network address for both, TLS and QUIC
  285. // connetions in parallel. It returns if one of the two returns an error.
  286. // http.DefaultServeMux is used when handler is nil.
  287. // The correct Alt-Svc headers for QUIC are set.
  288. func ListenAndServe(addr, certFile, keyFile string, handler http.Handler) error {
  289. // Load certs
  290. var err error
  291. certs := make([]tls.Certificate, 1)
  292. certs[0], err = tls.LoadX509KeyPair(certFile, keyFile)
  293. if err != nil {
  294. return err
  295. }
  296. // We currently only use the cert-related stuff from tls.Config,
  297. // so we don't need to make a full copy.
  298. config := &tls.Config{
  299. Certificates: certs,
  300. }
  301. // Open the listeners
  302. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  303. if err != nil {
  304. return err
  305. }
  306. udpConn, err := net.ListenUDP("udp", udpAddr)
  307. if err != nil {
  308. return err
  309. }
  310. defer udpConn.Close()
  311. tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
  312. if err != nil {
  313. return err
  314. }
  315. tcpConn, err := net.ListenTCP("tcp", tcpAddr)
  316. if err != nil {
  317. return err
  318. }
  319. defer tcpConn.Close()
  320. tlsConn := tls.NewListener(tcpConn, config)
  321. defer tlsConn.Close()
  322. // Start the servers
  323. httpServer := &http.Server{
  324. Addr: addr,
  325. TLSConfig: config,
  326. }
  327. quicServer := &Server{
  328. Server: httpServer,
  329. }
  330. if handler == nil {
  331. handler = http.DefaultServeMux
  332. }
  333. httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  334. quicServer.SetQuicHeaders(w.Header())
  335. handler.ServeHTTP(w, r)
  336. })
  337. hErr := make(chan error)
  338. qErr := make(chan error)
  339. go func() {
  340. hErr <- httpServer.Serve(tlsConn)
  341. }()
  342. go func() {
  343. qErr <- quicServer.Serve(udpConn)
  344. }()
  345. select {
  346. case err := <-hErr:
  347. quicServer.Close()
  348. return err
  349. case err := <-qErr:
  350. // Cannot close the HTTP server or wait for requests to complete properly :/
  351. return err
  352. }
  353. }