session.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. package quic
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/lucas-clemente/quic-go/internal/ackhandler"
  12. "github.com/lucas-clemente/quic-go/internal/congestion"
  13. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  14. "github.com/lucas-clemente/quic-go/internal/handshake"
  15. "github.com/lucas-clemente/quic-go/internal/protocol"
  16. "github.com/lucas-clemente/quic-go/internal/qerr"
  17. "github.com/lucas-clemente/quic-go/internal/utils"
  18. "github.com/lucas-clemente/quic-go/internal/wire"
  19. )
  20. type unpacker interface {
  21. Unpack(headerBinary []byte, hdr *wire.Header, data []byte) (*unpackedPacket, error)
  22. }
  23. type streamGetter interface {
  24. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  25. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  26. }
  27. type streamManager interface {
  28. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  29. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  30. OpenStream() (Stream, error)
  31. OpenUniStream() (SendStream, error)
  32. OpenStreamSync() (Stream, error)
  33. OpenUniStreamSync() (SendStream, error)
  34. AcceptStream() (Stream, error)
  35. AcceptUniStream() (ReceiveStream, error)
  36. DeleteStream(protocol.StreamID) error
  37. UpdateLimits(*handshake.TransportParameters)
  38. HandleMaxStreamsFrame(*wire.MaxStreamsFrame) error
  39. CloseWithError(error)
  40. }
  41. type cryptoStreamHandler interface {
  42. RunHandshake() error
  43. io.Closer
  44. ConnectionState() handshake.ConnectionState
  45. }
  46. type receivedPacket struct {
  47. remoteAddr net.Addr
  48. header *wire.Header
  49. data []byte
  50. rcvTime time.Time
  51. }
  52. type closeError struct {
  53. err error
  54. remote bool
  55. sendClose bool
  56. }
  57. // A Session is a QUIC session
  58. type session struct {
  59. sessionRunner sessionRunner
  60. destConnID protocol.ConnectionID
  61. srcConnID protocol.ConnectionID
  62. perspective protocol.Perspective
  63. version protocol.VersionNumber
  64. config *Config
  65. conn connection
  66. streamsMap streamManager
  67. rttStats *congestion.RTTStats
  68. cryptoStreamManager *cryptoStreamManager
  69. sentPacketHandler ackhandler.SentPacketHandler
  70. receivedPacketHandler ackhandler.ReceivedPacketHandler
  71. framer framer
  72. windowUpdateQueue *windowUpdateQueue
  73. connFlowController flowcontrol.ConnectionFlowController
  74. unpacker unpacker
  75. packer packer
  76. cryptoStreamHandler cryptoStreamHandler
  77. receivedPackets chan *receivedPacket
  78. sendingScheduled chan struct{}
  79. closeOnce sync.Once
  80. closed utils.AtomicBool
  81. // closeChan is used to notify the run loop that it should terminate
  82. closeChan chan closeError
  83. connectionClosePacket *packedPacket
  84. packetsReceivedAfterClose int
  85. ctx context.Context
  86. ctxCancel context.CancelFunc
  87. undecryptablePackets []*receivedPacket
  88. clientHelloWritten <-chan struct{}
  89. handshakeCompleteChan chan struct{} // is closed when the handshake completes
  90. handshakeComplete bool
  91. receivedFirstPacket bool // since packet numbers start at 0, we can't use largestRcvdPacketNumber != 0 for this
  92. receivedFirstForwardSecurePacket bool
  93. lastRcvdPacketNumber protocol.PacketNumber
  94. // Used to calculate the next packet number from the truncated wire
  95. // representation, and sent back in public reset packets
  96. largestRcvdPacketNumber protocol.PacketNumber
  97. sessionCreationTime time.Time
  98. lastNetworkActivityTime time.Time
  99. // pacingDeadline is the time when the next packet should be sent
  100. pacingDeadline time.Time
  101. peerParams *handshake.TransportParameters
  102. timer *utils.Timer
  103. // keepAlivePingSent stores whether a Ping frame was sent to the peer or not
  104. // it is reset as soon as we receive a packet from the peer
  105. keepAlivePingSent bool
  106. logger utils.Logger
  107. }
  108. var _ Session = &session{}
  109. var _ streamSender = &session{}
  110. var newSession = func(
  111. conn connection,
  112. runner sessionRunner,
  113. clientDestConnID protocol.ConnectionID,
  114. destConnID protocol.ConnectionID,
  115. srcConnID protocol.ConnectionID,
  116. conf *Config,
  117. tlsConf *tls.Config,
  118. params *handshake.TransportParameters,
  119. logger utils.Logger,
  120. v protocol.VersionNumber,
  121. ) (quicSession, error) {
  122. s := &session{
  123. conn: conn,
  124. sessionRunner: runner,
  125. config: conf,
  126. srcConnID: srcConnID,
  127. destConnID: destConnID,
  128. perspective: protocol.PerspectiveServer,
  129. handshakeCompleteChan: make(chan struct{}),
  130. logger: logger,
  131. version: v,
  132. }
  133. s.preSetup()
  134. initialStream := newCryptoStream()
  135. handshakeStream := newCryptoStream()
  136. s.streamsMap = newStreamsMap(
  137. s,
  138. s.newFlowController,
  139. uint64(s.config.MaxIncomingStreams),
  140. uint64(s.config.MaxIncomingUniStreams),
  141. s.perspective,
  142. s.version,
  143. )
  144. s.framer = newFramer(s.streamsMap, s.version)
  145. cs, err := handshake.NewCryptoSetupServer(
  146. initialStream,
  147. handshakeStream,
  148. clientDestConnID,
  149. params,
  150. s.processTransportParameters,
  151. tlsConf,
  152. conf.Versions,
  153. v,
  154. logger,
  155. protocol.PerspectiveServer,
  156. )
  157. if err != nil {
  158. return nil, err
  159. }
  160. s.cryptoStreamHandler = cs
  161. s.framer = newFramer(s.streamsMap, s.version)
  162. s.packer = newPacketPacker(
  163. s.destConnID,
  164. s.srcConnID,
  165. initialStream,
  166. handshakeStream,
  167. s.sentPacketHandler,
  168. s.RemoteAddr(),
  169. nil, // no token
  170. cs,
  171. s.framer,
  172. s.receivedPacketHandler,
  173. s.perspective,
  174. s.version,
  175. )
  176. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream)
  177. if err := s.postSetup(); err != nil {
  178. return nil, err
  179. }
  180. s.unpacker = newPacketUnpacker(cs, s.version)
  181. return s, nil
  182. }
  183. // declare this as a variable, such that we can it mock it in the tests
  184. var newClientSession = func(
  185. conn connection,
  186. runner sessionRunner,
  187. token []byte,
  188. origDestConnID protocol.ConnectionID,
  189. destConnID protocol.ConnectionID,
  190. srcConnID protocol.ConnectionID,
  191. conf *Config,
  192. tlsConf *tls.Config,
  193. params *handshake.TransportParameters,
  194. initialVersion protocol.VersionNumber,
  195. logger utils.Logger,
  196. v protocol.VersionNumber,
  197. ) (quicSession, error) {
  198. s := &session{
  199. conn: conn,
  200. sessionRunner: runner,
  201. config: conf,
  202. srcConnID: srcConnID,
  203. destConnID: destConnID,
  204. perspective: protocol.PerspectiveClient,
  205. handshakeCompleteChan: make(chan struct{}),
  206. logger: logger,
  207. version: v,
  208. }
  209. s.preSetup()
  210. initialStream := newCryptoStream()
  211. handshakeStream := newCryptoStream()
  212. cs, clientHelloWritten, err := handshake.NewCryptoSetupClient(
  213. initialStream,
  214. handshakeStream,
  215. origDestConnID,
  216. s.destConnID,
  217. params,
  218. s.processTransportParameters,
  219. tlsConf,
  220. initialVersion,
  221. conf.Versions,
  222. v,
  223. logger,
  224. protocol.PerspectiveClient,
  225. )
  226. if err != nil {
  227. return nil, err
  228. }
  229. s.clientHelloWritten = clientHelloWritten
  230. s.cryptoStreamHandler = cs
  231. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream)
  232. s.unpacker = newPacketUnpacker(cs, s.version)
  233. s.streamsMap = newStreamsMap(
  234. s,
  235. s.newFlowController,
  236. uint64(s.config.MaxIncomingStreams),
  237. uint64(s.config.MaxIncomingUniStreams),
  238. s.perspective,
  239. s.version,
  240. )
  241. s.framer = newFramer(s.streamsMap, s.version)
  242. s.packer = newPacketPacker(
  243. s.destConnID,
  244. s.srcConnID,
  245. initialStream,
  246. handshakeStream,
  247. s.sentPacketHandler,
  248. s.RemoteAddr(),
  249. token,
  250. cs,
  251. s.framer,
  252. s.receivedPacketHandler,
  253. s.perspective,
  254. s.version,
  255. )
  256. return s, s.postSetup()
  257. }
  258. func (s *session) preSetup() {
  259. s.rttStats = &congestion.RTTStats{}
  260. s.sentPacketHandler = ackhandler.NewSentPacketHandler(s.rttStats, s.logger, s.version)
  261. s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
  262. s.connFlowController = flowcontrol.NewConnectionFlowController(
  263. protocol.InitialMaxData,
  264. protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
  265. s.onHasConnectionWindowUpdate,
  266. s.rttStats,
  267. s.logger,
  268. )
  269. }
  270. func (s *session) postSetup() error {
  271. s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
  272. s.closeChan = make(chan closeError, 1)
  273. s.sendingScheduled = make(chan struct{}, 1)
  274. s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets)
  275. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  276. s.timer = utils.NewTimer()
  277. now := time.Now()
  278. s.lastNetworkActivityTime = now
  279. s.sessionCreationTime = now
  280. s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
  281. return nil
  282. }
  283. // run the session main loop
  284. func (s *session) run() error {
  285. defer s.ctxCancel()
  286. go func() {
  287. if err := s.cryptoStreamHandler.RunHandshake(); err != nil {
  288. s.closeLocal(err)
  289. return
  290. }
  291. close(s.handshakeCompleteChan)
  292. }()
  293. if s.perspective == protocol.PerspectiveClient {
  294. select {
  295. case <-s.clientHelloWritten:
  296. s.scheduleSending()
  297. case closeErr := <-s.closeChan:
  298. // put the close error back into the channel, so that the run loop can receive it
  299. s.closeChan <- closeErr
  300. }
  301. }
  302. var closeErr closeError
  303. runLoop:
  304. for {
  305. // Close immediately if requested
  306. select {
  307. case closeErr = <-s.closeChan:
  308. break runLoop
  309. case <-s.handshakeCompleteChan:
  310. s.handleHandshakeComplete()
  311. default:
  312. }
  313. s.maybeResetTimer()
  314. select {
  315. case closeErr = <-s.closeChan:
  316. break runLoop
  317. case <-s.timer.Chan():
  318. s.timer.SetRead()
  319. // We do all the interesting stuff after the switch statement, so
  320. // nothing to see here.
  321. case <-s.sendingScheduled:
  322. // We do all the interesting stuff after the switch statement, so
  323. // nothing to see here.
  324. case p := <-s.receivedPackets:
  325. err := s.handlePacketImpl(p)
  326. if err != nil {
  327. if qErr, ok := err.(*qerr.QuicError); ok && qErr.ErrorCode == qerr.DecryptionFailure {
  328. s.tryQueueingUndecryptablePacket(p)
  329. continue
  330. }
  331. s.closeLocal(err)
  332. continue
  333. }
  334. // This is a bit unclean, but works properly, since the packet always
  335. // begins with the public header and we never copy it.
  336. putPacketBuffer(&p.header.Raw)
  337. case <-s.handshakeCompleteChan:
  338. s.handleHandshakeComplete()
  339. }
  340. now := time.Now()
  341. if timeout := s.sentPacketHandler.GetAlarmTimeout(); !timeout.IsZero() && timeout.Before(now) {
  342. // This could cause packets to be retransmitted.
  343. // Check it before trying to send packets.
  344. if err := s.sentPacketHandler.OnAlarm(); err != nil {
  345. s.closeLocal(err)
  346. }
  347. }
  348. var pacingDeadline time.Time
  349. if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
  350. pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  351. }
  352. if s.config.KeepAlive && !s.keepAlivePingSent && s.handshakeComplete && time.Since(s.lastNetworkActivityTime) >= s.peerParams.IdleTimeout/2 {
  353. // send a PING frame since there is no activity in the session
  354. s.logger.Debugf("Sending a keep-alive ping to keep the connection alive.")
  355. s.framer.QueueControlFrame(&wire.PingFrame{})
  356. s.keepAlivePingSent = true
  357. } else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
  358. // If we get to this point before the pacing deadline, we should wait until that deadline.
  359. // This can happen when scheduleSending is called, or a packet is received.
  360. // Set the timer and restart the run loop.
  361. s.pacingDeadline = pacingDeadline
  362. continue
  363. }
  364. if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  365. s.closeLocal(qerr.Error(qerr.HandshakeTimeout, "Crypto handshake did not complete in time."))
  366. continue
  367. }
  368. if s.handshakeComplete && now.Sub(s.lastNetworkActivityTime) >= s.config.IdleTimeout {
  369. s.closeLocal(qerr.Error(qerr.NetworkIdleTimeout, "No recent network activity."))
  370. continue
  371. }
  372. if err := s.sendPackets(); err != nil {
  373. s.closeLocal(err)
  374. }
  375. }
  376. if err := s.handleCloseError(closeErr); err != nil {
  377. s.logger.Infof("Handling close error failed: %s", err)
  378. }
  379. s.closed.Set(true)
  380. s.logger.Infof("Connection %s closed.", s.srcConnID)
  381. s.cryptoStreamHandler.Close()
  382. return closeErr.err
  383. }
  384. func (s *session) Context() context.Context {
  385. return s.ctx
  386. }
  387. func (s *session) ConnectionState() ConnectionState {
  388. return s.cryptoStreamHandler.ConnectionState()
  389. }
  390. func (s *session) maybeResetTimer() {
  391. var deadline time.Time
  392. if s.config.KeepAlive && s.handshakeComplete && !s.keepAlivePingSent {
  393. deadline = s.lastNetworkActivityTime.Add(s.peerParams.IdleTimeout / 2)
  394. } else {
  395. deadline = s.lastNetworkActivityTime.Add(s.config.IdleTimeout)
  396. }
  397. if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() {
  398. deadline = utils.MinTime(deadline, ackAlarm)
  399. }
  400. if lossTime := s.sentPacketHandler.GetAlarmTimeout(); !lossTime.IsZero() {
  401. deadline = utils.MinTime(deadline, lossTime)
  402. }
  403. if !s.handshakeComplete {
  404. handshakeDeadline := s.sessionCreationTime.Add(s.config.HandshakeTimeout)
  405. deadline = utils.MinTime(deadline, handshakeDeadline)
  406. }
  407. if !s.pacingDeadline.IsZero() {
  408. deadline = utils.MinTime(deadline, s.pacingDeadline)
  409. }
  410. s.timer.Reset(deadline)
  411. }
  412. func (s *session) handleHandshakeComplete() {
  413. s.handshakeComplete = true
  414. s.handshakeCompleteChan = nil // prevent this case from ever being selected again
  415. s.sessionRunner.onHandshakeComplete(s)
  416. // The client completes the handshake first (after sending the CFIN).
  417. // We need to make sure they learn about the peer completing the handshake,
  418. // in order to stop retransmitting handshake packets.
  419. // They will stop retransmitting handshake packets when receiving the first forward-secure packet.
  420. // We need to make sure that a retransmittable forward-secure packet is sent,
  421. // independent from the application protocol.
  422. if s.perspective == protocol.PerspectiveServer {
  423. s.queueControlFrame(&wire.PingFrame{})
  424. s.sentPacketHandler.SetHandshakeComplete()
  425. }
  426. }
  427. func (s *session) handlePacketImpl(p *receivedPacket) error {
  428. hdr := p.header
  429. // The server can change the source connection ID with the first Handshake packet.
  430. // After this, all packets with a different source connection have to be ignored.
  431. if s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.destConnID) {
  432. s.logger.Debugf("Dropping packet with unexpected source connection ID: %s (expected %s)", p.header.SrcConnectionID, s.destConnID)
  433. return nil
  434. }
  435. p.rcvTime = time.Now()
  436. // Calculate packet number
  437. hdr.PacketNumber = protocol.InferPacketNumber(
  438. hdr.PacketNumberLen,
  439. s.largestRcvdPacketNumber,
  440. hdr.PacketNumber,
  441. s.version,
  442. )
  443. packet, err := s.unpacker.Unpack(hdr.Raw, hdr, p.data)
  444. if s.logger.Debug() {
  445. if err != nil {
  446. s.logger.Debugf("<- Reading packet 0x%x (%d bytes) for connection %s", hdr.PacketNumber, len(p.data)+len(hdr.Raw), hdr.DestConnectionID)
  447. } else {
  448. s.logger.Debugf("<- Reading packet 0x%x (%d bytes) for connection %s, %s", hdr.PacketNumber, len(p.data)+len(hdr.Raw), hdr.DestConnectionID, packet.encryptionLevel)
  449. }
  450. hdr.Log(s.logger)
  451. }
  452. // if the decryption failed, this might be a packet sent by an attacker
  453. if err != nil {
  454. return err
  455. }
  456. // The server can change the source connection ID with the first Handshake packet.
  457. if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.destConnID) {
  458. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", hdr.SrcConnectionID)
  459. s.destConnID = hdr.SrcConnectionID
  460. s.packer.ChangeDestConnectionID(s.destConnID)
  461. }
  462. s.receivedFirstPacket = true
  463. s.lastNetworkActivityTime = p.rcvTime
  464. s.keepAlivePingSent = false
  465. // The client completes the handshake first (after sending the CFIN).
  466. // We know that the server completed the handshake as soon as we receive a forward-secure packet.
  467. if s.perspective == protocol.PerspectiveClient {
  468. if !s.receivedFirstForwardSecurePacket && packet.encryptionLevel == protocol.Encryption1RTT {
  469. s.receivedFirstForwardSecurePacket = true
  470. s.sentPacketHandler.SetHandshakeComplete()
  471. }
  472. }
  473. s.lastRcvdPacketNumber = hdr.PacketNumber
  474. // Only do this after decrypting, so we are sure the packet is not attacker-controlled
  475. s.largestRcvdPacketNumber = utils.MaxPacketNumber(s.largestRcvdPacketNumber, hdr.PacketNumber)
  476. // If this is a Retry packet, there's no need to send an ACK.
  477. // The session will be closed and recreated as soon as the crypto setup processed the HRR.
  478. if hdr.Type != protocol.PacketTypeRetry {
  479. isRetransmittable := ackhandler.HasRetransmittableFrames(packet.frames)
  480. if err := s.receivedPacketHandler.ReceivedPacket(hdr.PacketNumber, p.rcvTime, isRetransmittable); err != nil {
  481. return err
  482. }
  483. }
  484. return s.handleFrames(packet.frames, packet.encryptionLevel)
  485. }
  486. func (s *session) handleFrames(fs []wire.Frame, encLevel protocol.EncryptionLevel) error {
  487. for _, ff := range fs {
  488. var err error
  489. wire.LogFrame(s.logger, ff, false)
  490. switch frame := ff.(type) {
  491. case *wire.CryptoFrame:
  492. err = s.handleCryptoFrame(frame, encLevel)
  493. case *wire.StreamFrame:
  494. err = s.handleStreamFrame(frame, encLevel)
  495. case *wire.AckFrame:
  496. err = s.handleAckFrame(frame, encLevel)
  497. case *wire.ConnectionCloseFrame:
  498. s.closeRemote(qerr.Error(frame.ErrorCode, frame.ReasonPhrase))
  499. case *wire.ResetStreamFrame:
  500. err = s.handleResetStreamFrame(frame)
  501. case *wire.MaxDataFrame:
  502. s.handleMaxDataFrame(frame)
  503. case *wire.MaxStreamDataFrame:
  504. err = s.handleMaxStreamDataFrame(frame)
  505. case *wire.MaxStreamsFrame:
  506. err = s.handleMaxStreamsFrame(frame)
  507. case *wire.DataBlockedFrame:
  508. case *wire.StreamDataBlockedFrame:
  509. case *wire.StreamsBlockedFrame:
  510. case *wire.StopSendingFrame:
  511. err = s.handleStopSendingFrame(frame)
  512. case *wire.PingFrame:
  513. case *wire.PathChallengeFrame:
  514. s.handlePathChallengeFrame(frame)
  515. case *wire.PathResponseFrame:
  516. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  517. err = errors.New("unexpected PATH_RESPONSE frame")
  518. case *wire.NewTokenFrame:
  519. case *wire.NewConnectionIDFrame:
  520. case *wire.RetireConnectionIDFrame:
  521. // since we don't send new connection IDs, we don't expect retirements
  522. err = errors.New("unexpected RETIRE_CONNECTION_ID frame")
  523. default:
  524. return errors.New("Session BUG: unexpected frame type")
  525. }
  526. if err != nil {
  527. return err
  528. }
  529. }
  530. return nil
  531. }
  532. // handlePacket is called by the server with a new packet
  533. func (s *session) handlePacket(p *receivedPacket) {
  534. if s.closed.Get() {
  535. s.handlePacketAfterClosed(p)
  536. }
  537. // Discard packets once the amount of queued packets is larger than
  538. // the channel size, protocol.MaxSessionUnprocessedPackets
  539. select {
  540. case s.receivedPackets <- p:
  541. default:
  542. }
  543. }
  544. func (s *session) handlePacketAfterClosed(p *receivedPacket) {
  545. s.packetsReceivedAfterClose++
  546. if s.connectionClosePacket == nil {
  547. return
  548. }
  549. // exponential backoff
  550. // only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
  551. for n := s.packetsReceivedAfterClose; n > 1; n = n / 2 {
  552. if n%2 != 0 {
  553. return
  554. }
  555. }
  556. s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.packetsReceivedAfterClose)
  557. if err := s.conn.Write(s.connectionClosePacket.raw); err != nil {
  558. s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err)
  559. }
  560. }
  561. func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
  562. encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel)
  563. if err != nil {
  564. return err
  565. }
  566. if encLevelChanged {
  567. s.tryDecryptingQueuedPackets()
  568. }
  569. return nil
  570. }
  571. func (s *session) handleStreamFrame(frame *wire.StreamFrame, encLevel protocol.EncryptionLevel) error {
  572. if encLevel < protocol.Encryption1RTT {
  573. return qerr.Error(qerr.UnencryptedStreamData, fmt.Sprintf("received unencrypted stream data on stream %d", frame.StreamID))
  574. }
  575. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  576. if err != nil {
  577. return err
  578. }
  579. if str == nil {
  580. // Stream is closed and already garbage collected
  581. // ignore this StreamFrame
  582. return nil
  583. }
  584. return str.handleStreamFrame(frame)
  585. }
  586. func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  587. s.connFlowController.UpdateSendWindow(frame.ByteOffset)
  588. }
  589. func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  590. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  591. if err != nil {
  592. return err
  593. }
  594. if str == nil {
  595. // stream is closed and already garbage collected
  596. return nil
  597. }
  598. str.handleMaxStreamDataFrame(frame)
  599. return nil
  600. }
  601. func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) error {
  602. return s.streamsMap.HandleMaxStreamsFrame(frame)
  603. }
  604. func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  605. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  606. if err != nil {
  607. return err
  608. }
  609. if str == nil {
  610. // stream is closed and already garbage collected
  611. return nil
  612. }
  613. return str.handleResetStreamFrame(frame)
  614. }
  615. func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  616. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  617. if err != nil {
  618. return err
  619. }
  620. if str == nil {
  621. // stream is closed and already garbage collected
  622. return nil
  623. }
  624. str.handleStopSendingFrame(frame)
  625. return nil
  626. }
  627. func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  628. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  629. }
  630. func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
  631. if err := s.sentPacketHandler.ReceivedAck(frame, s.lastRcvdPacketNumber, encLevel, s.lastNetworkActivityTime); err != nil {
  632. return err
  633. }
  634. s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
  635. return nil
  636. }
  637. // closeLocal closes the session and send a CONNECTION_CLOSE containing the error
  638. func (s *session) closeLocal(e error) {
  639. s.closeOnce.Do(func() {
  640. s.sessionRunner.retireConnectionID(s.srcConnID)
  641. s.closeChan <- closeError{err: e, sendClose: true, remote: false}
  642. })
  643. }
  644. // destroy closes the session without sending the error on the wire
  645. func (s *session) destroy(e error) {
  646. s.closeOnce.Do(func() {
  647. s.sessionRunner.removeConnectionID(s.srcConnID)
  648. s.closeChan <- closeError{err: e, sendClose: false, remote: false}
  649. })
  650. }
  651. func (s *session) closeRemote(e error) {
  652. s.closeOnce.Do(func() {
  653. s.sessionRunner.removeConnectionID(s.srcConnID)
  654. s.closeChan <- closeError{err: e, remote: true}
  655. })
  656. }
  657. // Close the connection. It sends a qerr.PeerGoingAway.
  658. // It waits until the run loop has stopped before returning
  659. func (s *session) Close() error {
  660. s.closeLocal(nil)
  661. <-s.ctx.Done()
  662. return nil
  663. }
  664. func (s *session) CloseWithError(code protocol.ApplicationErrorCode, e error) error {
  665. s.closeLocal(qerr.Error(qerr.ErrorCode(code), e.Error()))
  666. <-s.ctx.Done()
  667. return nil
  668. }
  669. func (s *session) handleCloseError(closeErr closeError) error {
  670. if closeErr.err == nil {
  671. closeErr.err = qerr.PeerGoingAway
  672. }
  673. var quicErr *qerr.QuicError
  674. var ok bool
  675. if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok {
  676. quicErr = qerr.ToQuicError(closeErr.err)
  677. }
  678. // Don't log 'normal' reasons
  679. if quicErr.ErrorCode == qerr.PeerGoingAway || quicErr.ErrorCode == qerr.NetworkIdleTimeout {
  680. s.logger.Infof("Closing connection %s.", s.srcConnID)
  681. } else {
  682. s.logger.Errorf("Closing session with error: %s", closeErr.err.Error())
  683. }
  684. s.streamsMap.CloseWithError(quicErr)
  685. if !closeErr.sendClose {
  686. return nil
  687. }
  688. // If this is a remote close we're done here
  689. if closeErr.remote {
  690. return nil
  691. }
  692. if quicErr.ErrorCode == qerr.DecryptionFailure {
  693. // TODO(#943): send a stateless reset
  694. return nil
  695. }
  696. return s.sendConnectionClose(quicErr)
  697. }
  698. func (s *session) processTransportParameters(params *handshake.TransportParameters) {
  699. s.peerParams = params
  700. s.streamsMap.UpdateLimits(params)
  701. s.packer.HandleTransportParameters(params)
  702. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  703. // the crypto stream is the only open stream at this moment
  704. // so we don't need to update stream flow control windows
  705. }
  706. func (s *session) sendPackets() error {
  707. s.pacingDeadline = time.Time{}
  708. sendMode := s.sentPacketHandler.SendMode()
  709. if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
  710. return nil
  711. }
  712. numPackets := s.sentPacketHandler.ShouldSendNumPackets()
  713. var numPacketsSent int
  714. sendLoop:
  715. for {
  716. switch sendMode {
  717. case ackhandler.SendNone:
  718. break sendLoop
  719. case ackhandler.SendAck:
  720. // If we already sent packets, and the send mode switches to SendAck,
  721. // we've just become congestion limited.
  722. // There's no need to try to send an ACK at this moment.
  723. if numPacketsSent > 0 {
  724. return nil
  725. }
  726. // We can at most send a single ACK only packet.
  727. // There will only be a new ACK after receiving new packets.
  728. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
  729. return s.maybeSendAckOnlyPacket()
  730. case ackhandler.SendTLP, ackhandler.SendRTO:
  731. if err := s.sendProbePacket(); err != nil {
  732. return err
  733. }
  734. numPacketsSent++
  735. case ackhandler.SendRetransmission:
  736. sentPacket, err := s.maybeSendRetransmission()
  737. if err != nil {
  738. return err
  739. }
  740. if sentPacket {
  741. numPacketsSent++
  742. // This can happen if a retransmission queued, but it wasn't necessary to send it.
  743. // e.g. when an Initial is queued, but we already received a packet from the server.
  744. }
  745. case ackhandler.SendAny:
  746. sentPacket, err := s.sendPacket()
  747. if err != nil {
  748. return err
  749. }
  750. if !sentPacket {
  751. break sendLoop
  752. }
  753. numPacketsSent++
  754. default:
  755. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  756. }
  757. if numPacketsSent >= numPackets {
  758. break
  759. }
  760. sendMode = s.sentPacketHandler.SendMode()
  761. }
  762. // Only start the pacing timer if we sent as many packets as we were allowed.
  763. // There will probably be more to send when calling sendPacket again.
  764. if numPacketsSent == numPackets {
  765. s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  766. }
  767. return nil
  768. }
  769. func (s *session) maybeSendAckOnlyPacket() error {
  770. packet, err := s.packer.MaybePackAckPacket()
  771. if err != nil {
  772. return err
  773. }
  774. if packet == nil {
  775. return nil
  776. }
  777. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  778. return s.sendPackedPacket(packet)
  779. }
  780. // maybeSendRetransmission sends retransmissions for at most one packet.
  781. // It takes care that Initials aren't retransmitted, if a packet from the server was already received.
  782. func (s *session) maybeSendRetransmission() (bool, error) {
  783. var retransmitPacket *ackhandler.Packet
  784. for {
  785. retransmitPacket = s.sentPacketHandler.DequeuePacketForRetransmission()
  786. if retransmitPacket == nil {
  787. return false, nil
  788. }
  789. // Don't retransmit Initial packets if we already received a response.
  790. // An Initial might have been retransmitted multiple times before we receive a response.
  791. // As soon as we receive one response, we don't need to send any more Initials.
  792. if s.perspective == protocol.PerspectiveClient && s.receivedFirstPacket && retransmitPacket.PacketType == protocol.PacketTypeInitial {
  793. s.logger.Debugf("Skipping retransmission of packet %d. Already received a response to an Initial.", retransmitPacket.PacketNumber)
  794. continue
  795. }
  796. break
  797. }
  798. if retransmitPacket.EncryptionLevel != protocol.Encryption1RTT {
  799. s.logger.Debugf("Dequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber)
  800. } else {
  801. s.logger.Debugf("Dequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
  802. }
  803. packets, err := s.packer.PackRetransmission(retransmitPacket)
  804. if err != nil {
  805. return false, err
  806. }
  807. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  808. for i, packet := range packets {
  809. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  810. }
  811. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
  812. for _, packet := range packets {
  813. if err := s.sendPackedPacket(packet); err != nil {
  814. return false, err
  815. }
  816. }
  817. return true, nil
  818. }
  819. func (s *session) sendProbePacket() error {
  820. p, err := s.sentPacketHandler.DequeueProbePacket()
  821. if err != nil {
  822. return err
  823. }
  824. s.logger.Debugf("Sending a retransmission for %#x as a probe packet.", p.PacketNumber)
  825. packets, err := s.packer.PackRetransmission(p)
  826. if err != nil {
  827. return err
  828. }
  829. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  830. for i, packet := range packets {
  831. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  832. }
  833. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber)
  834. for _, packet := range packets {
  835. if err := s.sendPackedPacket(packet); err != nil {
  836. return err
  837. }
  838. }
  839. return nil
  840. }
  841. func (s *session) sendPacket() (bool, error) {
  842. if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
  843. s.framer.QueueControlFrame(&wire.DataBlockedFrame{DataLimit: offset})
  844. }
  845. s.windowUpdateQueue.QueueAll()
  846. packet, err := s.packer.PackPacket()
  847. if err != nil || packet == nil {
  848. return false, err
  849. }
  850. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  851. if err := s.sendPackedPacket(packet); err != nil {
  852. return false, err
  853. }
  854. return true, nil
  855. }
  856. func (s *session) sendPackedPacket(packet *packedPacket) error {
  857. defer putPacketBuffer(&packet.raw)
  858. s.logPacket(packet)
  859. return s.conn.Write(packet.raw)
  860. }
  861. func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
  862. packet, err := s.packer.PackConnectionClose(&wire.ConnectionCloseFrame{
  863. ErrorCode: quicErr.ErrorCode,
  864. ReasonPhrase: quicErr.ErrorMessage,
  865. })
  866. if err != nil {
  867. return err
  868. }
  869. s.connectionClosePacket = packet
  870. s.logPacket(packet)
  871. return s.conn.Write(packet.raw)
  872. }
  873. func (s *session) logPacket(packet *packedPacket) {
  874. if !s.logger.Debug() {
  875. // We don't need to allocate the slices for calling the format functions
  876. return
  877. }
  878. s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, len(packet.raw), s.srcConnID, packet.encryptionLevel)
  879. packet.header.Log(s.logger)
  880. for _, frame := range packet.frames {
  881. wire.LogFrame(s.logger, frame, true)
  882. }
  883. }
  884. // GetOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed.
  885. // It is *only* needed for gQUIC's H2.
  886. // It will be removed as soon as gQUIC moves towards the IETF H2/QUIC stream mapping.
  887. func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) {
  888. str, err := s.streamsMap.GetOrOpenSendStream(id)
  889. if str != nil {
  890. if bstr, ok := str.(Stream); ok {
  891. return bstr, err
  892. }
  893. return nil, fmt.Errorf("Stream %d is not a bidirectional stream", id)
  894. }
  895. // make sure to return an actual nil value here, not an Stream with value nil
  896. return nil, err
  897. }
  898. // AcceptStream returns the next stream openend by the peer
  899. func (s *session) AcceptStream() (Stream, error) {
  900. return s.streamsMap.AcceptStream()
  901. }
  902. func (s *session) AcceptUniStream() (ReceiveStream, error) {
  903. return s.streamsMap.AcceptUniStream()
  904. }
  905. // OpenStream opens a stream
  906. func (s *session) OpenStream() (Stream, error) {
  907. return s.streamsMap.OpenStream()
  908. }
  909. func (s *session) OpenStreamSync() (Stream, error) {
  910. return s.streamsMap.OpenStreamSync()
  911. }
  912. func (s *session) OpenUniStream() (SendStream, error) {
  913. return s.streamsMap.OpenUniStream()
  914. }
  915. func (s *session) OpenUniStreamSync() (SendStream, error) {
  916. return s.streamsMap.OpenUniStreamSync()
  917. }
  918. func (s *session) newStream(id protocol.StreamID) streamI {
  919. flowController := s.newFlowController(id)
  920. return newStream(id, s, flowController, s.version)
  921. }
  922. func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  923. var initialSendWindow protocol.ByteCount
  924. if s.peerParams != nil {
  925. if id.Type() == protocol.StreamTypeUni {
  926. initialSendWindow = s.peerParams.InitialMaxStreamDataUni
  927. } else {
  928. if id.InitiatedBy() == s.perspective {
  929. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal
  930. } else {
  931. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote
  932. }
  933. }
  934. }
  935. return flowcontrol.NewStreamFlowController(
  936. id,
  937. s.connFlowController,
  938. protocol.InitialMaxStreamData,
  939. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  940. initialSendWindow,
  941. s.onHasStreamWindowUpdate,
  942. s.rttStats,
  943. s.logger,
  944. )
  945. }
  946. // scheduleSending signals that we have data for sending
  947. func (s *session) scheduleSending() {
  948. select {
  949. case s.sendingScheduled <- struct{}{}:
  950. default:
  951. }
  952. }
  953. func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket) {
  954. if s.handshakeComplete {
  955. s.logger.Debugf("Received undecryptable packet from %s after the handshake: %#v, %d bytes data", p.remoteAddr.String(), p.header, len(p.data))
  956. return
  957. }
  958. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  959. s.logger.Infof("Dropping undecrytable packet 0x%x (undecryptable packet queue full)", p.header.PacketNumber)
  960. return
  961. }
  962. s.logger.Infof("Queueing packet 0x%x for later decryption", p.header.PacketNumber)
  963. s.undecryptablePackets = append(s.undecryptablePackets, p)
  964. }
  965. func (s *session) tryDecryptingQueuedPackets() {
  966. for _, p := range s.undecryptablePackets {
  967. s.handlePacket(p)
  968. }
  969. s.undecryptablePackets = s.undecryptablePackets[:0]
  970. }
  971. func (s *session) queueControlFrame(f wire.Frame) {
  972. s.framer.QueueControlFrame(f)
  973. s.scheduleSending()
  974. }
  975. func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
  976. s.windowUpdateQueue.AddStream(id)
  977. s.scheduleSending()
  978. }
  979. func (s *session) onHasConnectionWindowUpdate() {
  980. s.windowUpdateQueue.AddConnection()
  981. s.scheduleSending()
  982. }
  983. func (s *session) onHasStreamData(id protocol.StreamID) {
  984. s.framer.AddActiveStream(id)
  985. s.scheduleSending()
  986. }
  987. func (s *session) onStreamCompleted(id protocol.StreamID) {
  988. if err := s.streamsMap.DeleteStream(id); err != nil {
  989. s.closeLocal(err)
  990. }
  991. }
  992. func (s *session) LocalAddr() net.Addr {
  993. return s.conn.LocalAddr()
  994. }
  995. func (s *session) RemoteAddr() net.Addr {
  996. return s.conn.RemoteAddr()
  997. }
  998. func (s *session) GetVersion() protocol.VersionNumber {
  999. return s.version
  1000. }