server.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. package quic
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/handshake"
  13. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/protocol"
  14. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/qerr"
  15. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/utils"
  16. "v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/wire"
  17. )
  18. // packetHandler handles packets
  19. type packetHandler interface {
  20. handlePacket(*receivedPacket)
  21. io.Closer
  22. destroy(error)
  23. GetPerspective() protocol.Perspective
  24. }
  25. type unknownPacketHandler interface {
  26. handlePacket(*receivedPacket)
  27. closeWithError(error) error
  28. }
  29. type packetHandlerManager interface {
  30. Add(protocol.ConnectionID, packetHandler)
  31. Retire(protocol.ConnectionID)
  32. Remove(protocol.ConnectionID)
  33. SetServer(unknownPacketHandler)
  34. CloseServer()
  35. }
  36. type quicSession interface {
  37. Session
  38. handlePacket(*receivedPacket)
  39. GetVersion() protocol.VersionNumber
  40. run() error
  41. destroy(error)
  42. closeForRecreating() protocol.PacketNumber
  43. closeRemote(error)
  44. }
  45. type sessionRunner interface {
  46. onHandshakeComplete(Session)
  47. retireConnectionID(protocol.ConnectionID)
  48. removeConnectionID(protocol.ConnectionID)
  49. }
  50. type runner struct {
  51. onHandshakeCompleteImpl func(Session)
  52. retireConnectionIDImpl func(protocol.ConnectionID)
  53. removeConnectionIDImpl func(protocol.ConnectionID)
  54. }
  55. func (r *runner) onHandshakeComplete(s Session) { r.onHandshakeCompleteImpl(s) }
  56. func (r *runner) retireConnectionID(c protocol.ConnectionID) { r.retireConnectionIDImpl(c) }
  57. func (r *runner) removeConnectionID(c protocol.ConnectionID) { r.removeConnectionIDImpl(c) }
  58. var _ sessionRunner = &runner{}
  59. // A Listener of QUIC
  60. type server struct {
  61. mutex sync.Mutex
  62. tlsConf *tls.Config
  63. config *Config
  64. conn net.PacketConn
  65. // If the server is started with ListenAddr, we create a packet conn.
  66. // If it is started with Listen, we take a packet conn as a parameter.
  67. createdPacketConn bool
  68. cookieGenerator *handshake.CookieGenerator
  69. sessionHandler packetHandlerManager
  70. // set as a member, so they can be set in the tests
  71. newSession func(connection, sessionRunner, protocol.ConnectionID /* original connection ID */, protocol.ConnectionID /* destination connection ID */, protocol.ConnectionID /* source connection ID */, *Config, *tls.Config, *handshake.TransportParameters, utils.Logger, protocol.VersionNumber) (quicSession, error)
  72. serverError error
  73. errorChan chan struct{}
  74. closed bool
  75. sessionQueue chan Session
  76. sessionQueueLen int32 // to be used as an atomic
  77. sessionRunner sessionRunner
  78. logger utils.Logger
  79. }
  80. var _ Listener = &server{}
  81. var _ unknownPacketHandler = &server{}
  82. // ListenAddr creates a QUIC server listening on a given address.
  83. // The tls.Config must not be nil and must contain a certificate configuration.
  84. // The quic.Config may be nil, in that case the default values will be used.
  85. func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (Listener, error) {
  86. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  87. if err != nil {
  88. return nil, err
  89. }
  90. conn, err := net.ListenUDP("udp", udpAddr)
  91. if err != nil {
  92. return nil, err
  93. }
  94. serv, err := listen(conn, tlsConf, config)
  95. if err != nil {
  96. return nil, err
  97. }
  98. serv.createdPacketConn = true
  99. return serv, nil
  100. }
  101. // Listen listens for QUIC connections on a given net.PacketConn.
  102. // A single PacketConn only be used for a single call to Listen.
  103. // The PacketConn can be used for simultaneous calls to Dial.
  104. // QUIC connection IDs are used for demultiplexing the different connections.
  105. // The tls.Config must not be nil and must contain a certificate configuration.
  106. // The quic.Config may be nil, in that case the default values will be used.
  107. func Listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (Listener, error) {
  108. return listen(conn, tlsConf, config)
  109. }
  110. func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (*server, error) {
  111. // TODO(#1655): only require that tls.Config.Certificates or tls.Config.GetCertificate is set
  112. if tlsConf == nil || len(tlsConf.Certificates) == 0 {
  113. return nil, errors.New("quic: Certificates not set in tls.Config")
  114. }
  115. config = populateServerConfig(config)
  116. for _, v := range config.Versions {
  117. if !protocol.IsValidVersion(v) {
  118. return nil, fmt.Errorf("%s is not a valid QUIC version", v)
  119. }
  120. }
  121. sessionHandler, err := getMultiplexer().AddConn(conn, config.ConnectionIDLength)
  122. if err != nil {
  123. return nil, err
  124. }
  125. s := &server{
  126. conn: conn,
  127. tlsConf: tlsConf,
  128. config: config,
  129. sessionHandler: sessionHandler,
  130. sessionQueue: make(chan Session),
  131. errorChan: make(chan struct{}),
  132. newSession: newSession,
  133. logger: utils.DefaultLogger.WithPrefix("server"),
  134. }
  135. if err := s.setup(); err != nil {
  136. return nil, err
  137. }
  138. sessionHandler.SetServer(s)
  139. s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
  140. return s, nil
  141. }
  142. func (s *server) setup() error {
  143. s.sessionRunner = &runner{
  144. onHandshakeCompleteImpl: func(sess Session) {
  145. go func() {
  146. atomic.AddInt32(&s.sessionQueueLen, 1)
  147. defer atomic.AddInt32(&s.sessionQueueLen, -1)
  148. select {
  149. case s.sessionQueue <- sess:
  150. // blocks until the session is accepted
  151. case <-sess.Context().Done():
  152. // don't pass sessions that were already closed to Accept()
  153. }
  154. }()
  155. },
  156. retireConnectionIDImpl: s.sessionHandler.Retire,
  157. removeConnectionIDImpl: s.sessionHandler.Remove,
  158. }
  159. cookieGenerator, err := handshake.NewCookieGenerator()
  160. if err != nil {
  161. return err
  162. }
  163. s.cookieGenerator = cookieGenerator
  164. return nil
  165. }
  166. var defaultAcceptCookie = func(clientAddr net.Addr, cookie *Cookie) bool {
  167. if cookie == nil {
  168. return false
  169. }
  170. if time.Now().After(cookie.SentTime.Add(protocol.CookieExpiryTime)) {
  171. return false
  172. }
  173. var sourceAddr string
  174. if udpAddr, ok := clientAddr.(*net.UDPAddr); ok {
  175. sourceAddr = udpAddr.IP.String()
  176. } else {
  177. sourceAddr = clientAddr.String()
  178. }
  179. return sourceAddr == cookie.RemoteAddr
  180. }
  181. // populateServerConfig populates fields in the quic.Config with their default values, if none are set
  182. // it may be called with nil
  183. func populateServerConfig(config *Config) *Config {
  184. if config == nil {
  185. config = &Config{}
  186. }
  187. versions := config.Versions
  188. if len(versions) == 0 {
  189. versions = protocol.SupportedVersions
  190. }
  191. vsa := defaultAcceptCookie
  192. if config.AcceptCookie != nil {
  193. vsa = config.AcceptCookie
  194. }
  195. handshakeTimeout := protocol.DefaultHandshakeTimeout
  196. if config.HandshakeTimeout != 0 {
  197. handshakeTimeout = config.HandshakeTimeout
  198. }
  199. idleTimeout := protocol.DefaultIdleTimeout
  200. if config.IdleTimeout != 0 {
  201. idleTimeout = config.IdleTimeout
  202. }
  203. maxReceiveStreamFlowControlWindow := config.MaxReceiveStreamFlowControlWindow
  204. if maxReceiveStreamFlowControlWindow == 0 {
  205. maxReceiveStreamFlowControlWindow = protocol.DefaultMaxReceiveStreamFlowControlWindow
  206. }
  207. maxReceiveConnectionFlowControlWindow := config.MaxReceiveConnectionFlowControlWindow
  208. if maxReceiveConnectionFlowControlWindow == 0 {
  209. maxReceiveConnectionFlowControlWindow = protocol.DefaultMaxReceiveConnectionFlowControlWindow
  210. }
  211. maxIncomingStreams := config.MaxIncomingStreams
  212. if maxIncomingStreams == 0 {
  213. maxIncomingStreams = protocol.DefaultMaxIncomingStreams
  214. } else if maxIncomingStreams < 0 {
  215. maxIncomingStreams = 0
  216. }
  217. maxIncomingUniStreams := config.MaxIncomingUniStreams
  218. if maxIncomingUniStreams == 0 {
  219. maxIncomingUniStreams = protocol.DefaultMaxIncomingUniStreams
  220. } else if maxIncomingUniStreams < 0 {
  221. maxIncomingUniStreams = 0
  222. }
  223. connIDLen := config.ConnectionIDLength
  224. if connIDLen == 0 {
  225. connIDLen = protocol.DefaultConnectionIDLength
  226. }
  227. return &Config{
  228. Versions: versions,
  229. HandshakeTimeout: handshakeTimeout,
  230. IdleTimeout: idleTimeout,
  231. AcceptCookie: vsa,
  232. KeepAlive: config.KeepAlive,
  233. MaxReceiveStreamFlowControlWindow: maxReceiveStreamFlowControlWindow,
  234. MaxReceiveConnectionFlowControlWindow: maxReceiveConnectionFlowControlWindow,
  235. MaxIncomingStreams: maxIncomingStreams,
  236. MaxIncomingUniStreams: maxIncomingUniStreams,
  237. ConnectionIDLength: connIDLen,
  238. }
  239. }
  240. // Accept returns newly openend sessions
  241. func (s *server) Accept() (Session, error) {
  242. var sess Session
  243. select {
  244. case sess = <-s.sessionQueue:
  245. return sess, nil
  246. case <-s.errorChan:
  247. return nil, s.serverError
  248. }
  249. }
  250. // Close the server
  251. func (s *server) Close() error {
  252. s.mutex.Lock()
  253. defer s.mutex.Unlock()
  254. if s.closed {
  255. return nil
  256. }
  257. return s.closeWithMutex()
  258. }
  259. func (s *server) closeWithMutex() error {
  260. s.sessionHandler.CloseServer()
  261. if s.serverError == nil {
  262. s.serverError = errors.New("server closed")
  263. }
  264. var err error
  265. // If the server was started with ListenAddr, we created the packet conn.
  266. // We need to close it in order to make the go routine reading from that conn return.
  267. if s.createdPacketConn {
  268. err = s.conn.Close()
  269. }
  270. s.closed = true
  271. close(s.errorChan)
  272. return err
  273. }
  274. func (s *server) closeWithError(e error) error {
  275. s.mutex.Lock()
  276. defer s.mutex.Unlock()
  277. if s.closed {
  278. return nil
  279. }
  280. s.serverError = e
  281. return s.closeWithMutex()
  282. }
  283. // Addr returns the server's network address
  284. func (s *server) Addr() net.Addr {
  285. return s.conn.LocalAddr()
  286. }
  287. func (s *server) handlePacket(p *receivedPacket) {
  288. hdr := p.hdr
  289. // send a Version Negotiation Packet if the client is speaking a different protocol version
  290. if !protocol.IsSupportedVersion(s.config.Versions, hdr.Version) {
  291. go s.sendVersionNegotiationPacket(p)
  292. return
  293. }
  294. if hdr.Type == protocol.PacketTypeInitial {
  295. go s.handleInitial(p)
  296. return
  297. }
  298. // TODO(#943): send Stateless Reset
  299. p.buffer.Release()
  300. }
  301. func (s *server) handleInitial(p *receivedPacket) {
  302. s.logger.Debugf("<- Received Initial packet.")
  303. sess, connID, err := s.handleInitialImpl(p)
  304. if err != nil {
  305. p.buffer.Release()
  306. s.logger.Errorf("Error occurred handling initial packet: %s", err)
  307. return
  308. }
  309. if sess == nil { // a retry was done, or the connection attempt was rejected
  310. p.buffer.Release()
  311. return
  312. }
  313. // Don't put the packet buffer back if a new session was created.
  314. // The session will handle the packet and take of that.
  315. serverSession := newServerSession(sess, s.config, s.logger)
  316. s.sessionHandler.Add(connID, serverSession)
  317. }
  318. func (s *server) handleInitialImpl(p *receivedPacket) (quicSession, protocol.ConnectionID, error) {
  319. hdr := p.hdr
  320. if len(hdr.Token) == 0 && hdr.DestConnectionID.Len() < protocol.MinConnectionIDLenInitial {
  321. return nil, nil, errors.New("dropping Initial packet with too short connection ID")
  322. }
  323. if len(p.data) < protocol.MinInitialPacketSize {
  324. return nil, nil, errors.New("dropping too small Initial packet")
  325. }
  326. var cookie *Cookie
  327. var origDestConnectionID protocol.ConnectionID
  328. if len(hdr.Token) > 0 {
  329. c, err := s.cookieGenerator.DecodeToken(hdr.Token)
  330. if err == nil {
  331. cookie = &Cookie{
  332. RemoteAddr: c.RemoteAddr,
  333. SentTime: c.SentTime,
  334. }
  335. origDestConnectionID = c.OriginalDestConnectionID
  336. }
  337. }
  338. if !s.config.AcceptCookie(p.remoteAddr, cookie) {
  339. // Log the Initial packet now.
  340. // If no Retry is sent, the packet will be logged by the session.
  341. (&wire.ExtendedHeader{Header: *p.hdr}).Log(s.logger)
  342. return nil, nil, s.sendRetry(p.remoteAddr, hdr)
  343. }
  344. if queueLen := atomic.LoadInt32(&s.sessionQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
  345. s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
  346. return nil, nil, s.sendServerBusy(p.remoteAddr, hdr)
  347. }
  348. connID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
  349. if err != nil {
  350. return nil, nil, err
  351. }
  352. s.logger.Debugf("Changing connection ID to %s.", connID)
  353. sess, err := s.createNewSession(
  354. p.remoteAddr,
  355. origDestConnectionID,
  356. hdr.DestConnectionID,
  357. hdr.SrcConnectionID,
  358. connID,
  359. hdr.Version,
  360. )
  361. if err != nil {
  362. return nil, nil, err
  363. }
  364. sess.handlePacket(p)
  365. return sess, connID, nil
  366. }
  367. func (s *server) createNewSession(
  368. remoteAddr net.Addr,
  369. origDestConnID protocol.ConnectionID,
  370. clientDestConnID protocol.ConnectionID,
  371. destConnID protocol.ConnectionID,
  372. srcConnID protocol.ConnectionID,
  373. version protocol.VersionNumber,
  374. ) (quicSession, error) {
  375. params := &handshake.TransportParameters{
  376. InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData,
  377. InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
  378. InitialMaxStreamDataUni: protocol.InitialMaxStreamData,
  379. InitialMaxData: protocol.InitialMaxData,
  380. IdleTimeout: s.config.IdleTimeout,
  381. MaxBidiStreams: uint64(s.config.MaxIncomingStreams),
  382. MaxUniStreams: uint64(s.config.MaxIncomingUniStreams),
  383. DisableMigration: true,
  384. // TODO(#855): generate a real token
  385. StatelessResetToken: bytes.Repeat([]byte{42}, 16),
  386. OriginalConnectionID: origDestConnID,
  387. }
  388. sess, err := s.newSession(
  389. &conn{pconn: s.conn, currentAddr: remoteAddr},
  390. s.sessionRunner,
  391. clientDestConnID,
  392. destConnID,
  393. srcConnID,
  394. s.config,
  395. s.tlsConf,
  396. params,
  397. s.logger,
  398. version,
  399. )
  400. if err != nil {
  401. return nil, err
  402. }
  403. go sess.run()
  404. return sess, nil
  405. }
  406. func (s *server) sendRetry(remoteAddr net.Addr, hdr *wire.Header) error {
  407. token, err := s.cookieGenerator.NewToken(remoteAddr, hdr.DestConnectionID)
  408. if err != nil {
  409. return err
  410. }
  411. connID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
  412. if err != nil {
  413. return err
  414. }
  415. replyHdr := &wire.ExtendedHeader{}
  416. replyHdr.IsLongHeader = true
  417. replyHdr.Type = protocol.PacketTypeRetry
  418. replyHdr.Version = hdr.Version
  419. replyHdr.SrcConnectionID = connID
  420. replyHdr.DestConnectionID = hdr.SrcConnectionID
  421. replyHdr.OrigDestConnectionID = hdr.DestConnectionID
  422. replyHdr.Token = token
  423. s.logger.Debugf("Changing connection ID to %s.\n-> Sending Retry", connID)
  424. replyHdr.Log(s.logger)
  425. buf := &bytes.Buffer{}
  426. if err := replyHdr.Write(buf, hdr.Version); err != nil {
  427. return err
  428. }
  429. if _, err := s.conn.WriteTo(buf.Bytes(), remoteAddr); err != nil {
  430. s.logger.Debugf("Error sending Retry: %s", err)
  431. }
  432. return nil
  433. }
  434. func (s *server) sendServerBusy(remoteAddr net.Addr, hdr *wire.Header) error {
  435. sealer, _, err := handshake.NewInitialAEAD(hdr.DestConnectionID, protocol.PerspectiveServer)
  436. if err != nil {
  437. return err
  438. }
  439. packetBuffer := getPacketBuffer()
  440. defer packetBuffer.Release()
  441. buf := bytes.NewBuffer(packetBuffer.Slice[:0])
  442. // TODO(#1567): use the SERVER_BUSY error code
  443. ccf := &wire.ConnectionCloseFrame{ErrorCode: qerr.PeerGoingAway}
  444. replyHdr := &wire.ExtendedHeader{}
  445. replyHdr.IsLongHeader = true
  446. replyHdr.Type = protocol.PacketTypeInitial
  447. replyHdr.Version = hdr.Version
  448. replyHdr.SrcConnectionID = hdr.DestConnectionID
  449. replyHdr.DestConnectionID = hdr.SrcConnectionID
  450. replyHdr.PacketNumberLen = protocol.PacketNumberLen4
  451. replyHdr.Length = 4 /* packet number len */ + ccf.Length(hdr.Version) + protocol.ByteCount(sealer.Overhead())
  452. if err := replyHdr.Write(buf, hdr.Version); err != nil {
  453. return err
  454. }
  455. payloadOffset := buf.Len()
  456. if err := ccf.Write(buf, hdr.Version); err != nil {
  457. return err
  458. }
  459. raw := buf.Bytes()
  460. _ = sealer.Seal(raw[payloadOffset:payloadOffset], raw[payloadOffset:], replyHdr.PacketNumber, raw[:payloadOffset])
  461. raw = raw[0 : buf.Len()+sealer.Overhead()]
  462. pnOffset := payloadOffset - int(replyHdr.PacketNumberLen)
  463. sealer.EncryptHeader(
  464. raw[pnOffset+4:pnOffset+4+16],
  465. &raw[0],
  466. raw[pnOffset:payloadOffset],
  467. )
  468. replyHdr.Log(s.logger)
  469. wire.LogFrame(s.logger, ccf, true)
  470. if _, err := s.conn.WriteTo(raw, remoteAddr); err != nil {
  471. s.logger.Debugf("Error rejecting connection: %s", err)
  472. }
  473. return nil
  474. }
  475. func (s *server) sendVersionNegotiationPacket(p *receivedPacket) {
  476. defer p.buffer.Release()
  477. hdr := p.hdr
  478. s.logger.Debugf("Client offered version %s, sending Version Negotiation", hdr.Version)
  479. data, err := wire.ComposeVersionNegotiation(hdr.SrcConnectionID, hdr.DestConnectionID, s.config.Versions)
  480. if err != nil {
  481. s.logger.Debugf("Error composing Version Negotiation: %s", err)
  482. return
  483. }
  484. if _, err := s.conn.WriteTo(data, p.remoteAddr); err != nil {
  485. s.logger.Debugf("Error sending Version Negotiation: %s", err)
  486. }
  487. }