session.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. package quic
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/lucas-clemente/quic-go/internal/ackhandler"
  12. "github.com/lucas-clemente/quic-go/internal/congestion"
  13. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  14. "github.com/lucas-clemente/quic-go/internal/handshake"
  15. "github.com/lucas-clemente/quic-go/internal/protocol"
  16. "github.com/lucas-clemente/quic-go/internal/qerr"
  17. "github.com/lucas-clemente/quic-go/internal/utils"
  18. "github.com/lucas-clemente/quic-go/internal/wire"
  19. )
  20. type unpacker interface {
  21. Unpack(hdr *wire.Header, data []byte) (*unpackedPacket, error)
  22. }
  23. type streamGetter interface {
  24. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  25. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  26. }
  27. type streamManager interface {
  28. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  29. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  30. OpenStream() (Stream, error)
  31. OpenUniStream() (SendStream, error)
  32. OpenStreamSync() (Stream, error)
  33. OpenUniStreamSync() (SendStream, error)
  34. AcceptStream() (Stream, error)
  35. AcceptUniStream() (ReceiveStream, error)
  36. DeleteStream(protocol.StreamID) error
  37. UpdateLimits(*handshake.TransportParameters)
  38. HandleMaxStreamsFrame(*wire.MaxStreamsFrame) error
  39. CloseWithError(error)
  40. }
  41. type cryptoStreamHandler interface {
  42. RunHandshake() error
  43. io.Closer
  44. ConnectionState() handshake.ConnectionState
  45. }
  46. type receivedPacket struct {
  47. remoteAddr net.Addr
  48. hdr *wire.Header
  49. rcvTime time.Time
  50. data []byte
  51. buffer *packetBuffer
  52. }
  53. type closeError struct {
  54. err error
  55. remote bool
  56. sendClose bool
  57. }
  58. var errCloseForRecreating = errors.New("closing session in order to recreate it")
  59. // A Session is a QUIC session
  60. type session struct {
  61. sessionRunner sessionRunner
  62. destConnID protocol.ConnectionID
  63. srcConnID protocol.ConnectionID
  64. perspective protocol.Perspective
  65. version protocol.VersionNumber
  66. config *Config
  67. conn connection
  68. streamsMap streamManager
  69. rttStats *congestion.RTTStats
  70. cryptoStreamManager *cryptoStreamManager
  71. sentPacketHandler ackhandler.SentPacketHandler
  72. receivedPacketHandler ackhandler.ReceivedPacketHandler
  73. framer framer
  74. windowUpdateQueue *windowUpdateQueue
  75. connFlowController flowcontrol.ConnectionFlowController
  76. unpacker unpacker
  77. packer packer
  78. cryptoStreamHandler cryptoStreamHandler
  79. receivedPackets chan *receivedPacket
  80. sendingScheduled chan struct{}
  81. closeOnce sync.Once
  82. closed utils.AtomicBool
  83. // closeChan is used to notify the run loop that it should terminate
  84. closeChan chan closeError
  85. connectionClosePacket *packedPacket
  86. packetsReceivedAfterClose int
  87. ctx context.Context
  88. ctxCancel context.CancelFunc
  89. undecryptablePackets []*receivedPacket
  90. clientHelloWritten <-chan struct{}
  91. handshakeCompleteChan chan struct{} // is closed when the handshake completes
  92. handshakeComplete bool
  93. receivedFirstPacket bool
  94. receivedFirstForwardSecurePacket bool
  95. sessionCreationTime time.Time
  96. lastNetworkActivityTime time.Time
  97. // pacingDeadline is the time when the next packet should be sent
  98. pacingDeadline time.Time
  99. peerParams *handshake.TransportParameters
  100. timer *utils.Timer
  101. // keepAlivePingSent stores whether a Ping frame was sent to the peer or not
  102. // it is reset as soon as we receive a packet from the peer
  103. keepAlivePingSent bool
  104. logger utils.Logger
  105. }
  106. var _ Session = &session{}
  107. var _ streamSender = &session{}
  108. var newSession = func(
  109. conn connection,
  110. runner sessionRunner,
  111. clientDestConnID protocol.ConnectionID,
  112. destConnID protocol.ConnectionID,
  113. srcConnID protocol.ConnectionID,
  114. conf *Config,
  115. tlsConf *tls.Config,
  116. params *handshake.TransportParameters,
  117. logger utils.Logger,
  118. v protocol.VersionNumber,
  119. ) (quicSession, error) {
  120. s := &session{
  121. conn: conn,
  122. sessionRunner: runner,
  123. config: conf,
  124. srcConnID: srcConnID,
  125. destConnID: destConnID,
  126. perspective: protocol.PerspectiveServer,
  127. handshakeCompleteChan: make(chan struct{}),
  128. logger: logger,
  129. version: v,
  130. }
  131. s.preSetup()
  132. s.sentPacketHandler = ackhandler.NewSentPacketHandler(0, s.rttStats, s.logger)
  133. initialStream := newCryptoStream()
  134. handshakeStream := newCryptoStream()
  135. s.streamsMap = newStreamsMap(
  136. s,
  137. s.newFlowController,
  138. uint64(s.config.MaxIncomingStreams),
  139. uint64(s.config.MaxIncomingUniStreams),
  140. s.perspective,
  141. s.version,
  142. )
  143. s.framer = newFramer(s.streamsMap, s.version)
  144. cs, err := handshake.NewCryptoSetupServer(
  145. initialStream,
  146. handshakeStream,
  147. clientDestConnID,
  148. params,
  149. s.processTransportParameters,
  150. tlsConf,
  151. conf.Versions,
  152. v,
  153. logger,
  154. protocol.PerspectiveServer,
  155. )
  156. if err != nil {
  157. return nil, err
  158. }
  159. s.cryptoStreamHandler = cs
  160. s.packer = newPacketPacker(
  161. s.destConnID,
  162. s.srcConnID,
  163. initialStream,
  164. handshakeStream,
  165. s.sentPacketHandler,
  166. s.RemoteAddr(),
  167. nil, // no token
  168. cs,
  169. s.framer,
  170. s.receivedPacketHandler,
  171. s.perspective,
  172. s.version,
  173. )
  174. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream)
  175. if err := s.postSetup(); err != nil {
  176. return nil, err
  177. }
  178. s.unpacker = newPacketUnpacker(cs, s.version)
  179. return s, nil
  180. }
  181. // declare this as a variable, such that we can it mock it in the tests
  182. var newClientSession = func(
  183. conn connection,
  184. runner sessionRunner,
  185. token []byte,
  186. origDestConnID protocol.ConnectionID,
  187. destConnID protocol.ConnectionID,
  188. srcConnID protocol.ConnectionID,
  189. conf *Config,
  190. tlsConf *tls.Config,
  191. initialPacketNumber protocol.PacketNumber,
  192. params *handshake.TransportParameters,
  193. initialVersion protocol.VersionNumber,
  194. logger utils.Logger,
  195. v protocol.VersionNumber,
  196. ) (quicSession, error) {
  197. s := &session{
  198. conn: conn,
  199. sessionRunner: runner,
  200. config: conf,
  201. srcConnID: srcConnID,
  202. destConnID: destConnID,
  203. perspective: protocol.PerspectiveClient,
  204. handshakeCompleteChan: make(chan struct{}),
  205. logger: logger,
  206. version: v,
  207. }
  208. s.preSetup()
  209. s.sentPacketHandler = ackhandler.NewSentPacketHandler(initialPacketNumber, s.rttStats, s.logger)
  210. initialStream := newCryptoStream()
  211. handshakeStream := newCryptoStream()
  212. cs, clientHelloWritten, err := handshake.NewCryptoSetupClient(
  213. initialStream,
  214. handshakeStream,
  215. origDestConnID,
  216. s.destConnID,
  217. params,
  218. s.processTransportParameters,
  219. tlsConf,
  220. initialVersion,
  221. conf.Versions,
  222. v,
  223. logger,
  224. protocol.PerspectiveClient,
  225. )
  226. if err != nil {
  227. return nil, err
  228. }
  229. s.clientHelloWritten = clientHelloWritten
  230. s.cryptoStreamHandler = cs
  231. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream)
  232. s.unpacker = newPacketUnpacker(cs, s.version)
  233. s.streamsMap = newStreamsMap(
  234. s,
  235. s.newFlowController,
  236. uint64(s.config.MaxIncomingStreams),
  237. uint64(s.config.MaxIncomingUniStreams),
  238. s.perspective,
  239. s.version,
  240. )
  241. s.framer = newFramer(s.streamsMap, s.version)
  242. s.packer = newPacketPacker(
  243. s.destConnID,
  244. s.srcConnID,
  245. initialStream,
  246. handshakeStream,
  247. s.sentPacketHandler,
  248. s.RemoteAddr(),
  249. token,
  250. cs,
  251. s.framer,
  252. s.receivedPacketHandler,
  253. s.perspective,
  254. s.version,
  255. )
  256. return s, s.postSetup()
  257. }
  258. func (s *session) preSetup() {
  259. s.rttStats = &congestion.RTTStats{}
  260. s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
  261. s.connFlowController = flowcontrol.NewConnectionFlowController(
  262. protocol.InitialMaxData,
  263. protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
  264. s.onHasConnectionWindowUpdate,
  265. s.rttStats,
  266. s.logger,
  267. )
  268. }
  269. func (s *session) postSetup() error {
  270. s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
  271. s.closeChan = make(chan closeError, 1)
  272. s.sendingScheduled = make(chan struct{}, 1)
  273. s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets)
  274. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  275. s.timer = utils.NewTimer()
  276. now := time.Now()
  277. s.lastNetworkActivityTime = now
  278. s.sessionCreationTime = now
  279. s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
  280. return nil
  281. }
  282. // run the session main loop
  283. func (s *session) run() error {
  284. defer s.ctxCancel()
  285. go func() {
  286. if err := s.cryptoStreamHandler.RunHandshake(); err != nil {
  287. s.closeLocal(err)
  288. return
  289. }
  290. close(s.handshakeCompleteChan)
  291. }()
  292. if s.perspective == protocol.PerspectiveClient {
  293. select {
  294. case <-s.clientHelloWritten:
  295. s.scheduleSending()
  296. case closeErr := <-s.closeChan:
  297. // put the close error back into the channel, so that the run loop can receive it
  298. s.closeChan <- closeErr
  299. }
  300. }
  301. var closeErr closeError
  302. runLoop:
  303. for {
  304. // Close immediately if requested
  305. select {
  306. case closeErr = <-s.closeChan:
  307. break runLoop
  308. case <-s.handshakeCompleteChan:
  309. s.handleHandshakeComplete()
  310. default:
  311. }
  312. s.maybeResetTimer()
  313. select {
  314. case closeErr = <-s.closeChan:
  315. break runLoop
  316. case <-s.timer.Chan():
  317. s.timer.SetRead()
  318. // We do all the interesting stuff after the switch statement, so
  319. // nothing to see here.
  320. case <-s.sendingScheduled:
  321. // We do all the interesting stuff after the switch statement, so
  322. // nothing to see here.
  323. case p := <-s.receivedPackets:
  324. // Only reset the timers if this packet was actually processed.
  325. // This avoids modifying any state when handling undecryptable packets,
  326. // which could be injected by an attacker.
  327. if wasProcessed := s.handlePacketImpl(p); !wasProcessed {
  328. continue
  329. }
  330. case <-s.handshakeCompleteChan:
  331. s.handleHandshakeComplete()
  332. }
  333. now := time.Now()
  334. if timeout := s.sentPacketHandler.GetAlarmTimeout(); !timeout.IsZero() && timeout.Before(now) {
  335. // This could cause packets to be retransmitted.
  336. // Check it before trying to send packets.
  337. if err := s.sentPacketHandler.OnAlarm(); err != nil {
  338. s.closeLocal(err)
  339. }
  340. }
  341. var pacingDeadline time.Time
  342. if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
  343. pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  344. }
  345. if s.config.KeepAlive && !s.keepAlivePingSent && s.handshakeComplete && time.Since(s.lastNetworkActivityTime) >= s.peerParams.IdleTimeout/2 {
  346. // send a PING frame since there is no activity in the session
  347. s.logger.Debugf("Sending a keep-alive ping to keep the connection alive.")
  348. s.framer.QueueControlFrame(&wire.PingFrame{})
  349. s.keepAlivePingSent = true
  350. } else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
  351. // If we get to this point before the pacing deadline, we should wait until that deadline.
  352. // This can happen when scheduleSending is called, or a packet is received.
  353. // Set the timer and restart the run loop.
  354. s.pacingDeadline = pacingDeadline
  355. continue
  356. }
  357. if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  358. s.closeLocal(qerr.Error(qerr.HandshakeTimeout, "Crypto handshake did not complete in time."))
  359. continue
  360. }
  361. if s.handshakeComplete && now.Sub(s.lastNetworkActivityTime) >= s.config.IdleTimeout {
  362. s.closeLocal(qerr.Error(qerr.NetworkIdleTimeout, "No recent network activity."))
  363. continue
  364. }
  365. if err := s.sendPackets(); err != nil {
  366. s.closeLocal(err)
  367. }
  368. }
  369. if err := s.handleCloseError(closeErr); err != nil {
  370. s.logger.Infof("Handling close error failed: %s", err)
  371. }
  372. s.closed.Set(true)
  373. s.logger.Infof("Connection %s closed.", s.srcConnID)
  374. s.cryptoStreamHandler.Close()
  375. return closeErr.err
  376. }
  377. func (s *session) Context() context.Context {
  378. return s.ctx
  379. }
  380. func (s *session) ConnectionState() ConnectionState {
  381. return s.cryptoStreamHandler.ConnectionState()
  382. }
  383. func (s *session) maybeResetTimer() {
  384. var deadline time.Time
  385. if s.config.KeepAlive && s.handshakeComplete && !s.keepAlivePingSent {
  386. deadline = s.lastNetworkActivityTime.Add(s.peerParams.IdleTimeout / 2)
  387. } else {
  388. deadline = s.lastNetworkActivityTime.Add(s.config.IdleTimeout)
  389. }
  390. if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() {
  391. deadline = utils.MinTime(deadline, ackAlarm)
  392. }
  393. if lossTime := s.sentPacketHandler.GetAlarmTimeout(); !lossTime.IsZero() {
  394. deadline = utils.MinTime(deadline, lossTime)
  395. }
  396. if !s.handshakeComplete {
  397. handshakeDeadline := s.sessionCreationTime.Add(s.config.HandshakeTimeout)
  398. deadline = utils.MinTime(deadline, handshakeDeadline)
  399. }
  400. if !s.pacingDeadline.IsZero() {
  401. deadline = utils.MinTime(deadline, s.pacingDeadline)
  402. }
  403. s.timer.Reset(deadline)
  404. }
  405. func (s *session) handleHandshakeComplete() {
  406. s.handshakeComplete = true
  407. s.handshakeCompleteChan = nil // prevent this case from ever being selected again
  408. s.sessionRunner.onHandshakeComplete(s)
  409. // The client completes the handshake first (after sending the CFIN).
  410. // We need to make sure they learn about the peer completing the handshake,
  411. // in order to stop retransmitting handshake packets.
  412. // They will stop retransmitting handshake packets when receiving the first forward-secure packet.
  413. // We need to make sure that a retransmittable forward-secure packet is sent,
  414. // independent from the application protocol.
  415. if s.perspective == protocol.PerspectiveServer {
  416. s.queueControlFrame(&wire.PingFrame{})
  417. s.sentPacketHandler.SetHandshakeComplete()
  418. }
  419. }
  420. func (s *session) handlePacketImpl(p *receivedPacket) bool /* was the packet successfully processed */ {
  421. var wasQueued bool
  422. defer func() {
  423. // Put back the packet buffer if the packet wasn't queued for later decryption.
  424. if !wasQueued {
  425. p.buffer.Release()
  426. }
  427. }()
  428. // The server can change the source connection ID with the first Handshake packet.
  429. // After this, all packets with a different source connection have to be ignored.
  430. if s.receivedFirstPacket && p.hdr.IsLongHeader && !p.hdr.SrcConnectionID.Equal(s.destConnID) {
  431. s.logger.Debugf("Dropping packet with unexpected source connection ID: %s (expected %s)", p.hdr.SrcConnectionID, s.destConnID)
  432. return false
  433. }
  434. // drop 0-RTT packets
  435. if p.hdr.Type == protocol.PacketType0RTT {
  436. return false
  437. }
  438. packet, err := s.unpacker.Unpack(p.hdr, p.data)
  439. // if the decryption failed, this might be a packet sent by an attacker
  440. if err != nil {
  441. if err == handshake.ErrOpenerNotYetAvailable {
  442. wasQueued = true
  443. s.tryQueueingUndecryptablePacket(p)
  444. return false
  445. }
  446. s.closeLocal(err)
  447. return false
  448. }
  449. if s.logger.Debug() {
  450. s.logger.Debugf("<- Reading packet %#x (%d bytes) for connection %s, %s", packet.packetNumber, len(p.data), p.hdr.DestConnectionID, packet.encryptionLevel)
  451. packet.hdr.Log(s.logger)
  452. }
  453. if err := s.handleUnpackedPacket(packet, p.rcvTime); err != nil {
  454. s.closeLocal(err)
  455. return false
  456. }
  457. return true
  458. }
  459. func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time) error {
  460. // The server can change the source connection ID with the first Handshake packet.
  461. if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.destConnID) {
  462. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", packet.hdr.SrcConnectionID)
  463. s.destConnID = packet.hdr.SrcConnectionID
  464. s.packer.ChangeDestConnectionID(s.destConnID)
  465. }
  466. s.receivedFirstPacket = true
  467. s.lastNetworkActivityTime = rcvTime
  468. s.keepAlivePingSent = false
  469. // The client completes the handshake first (after sending the CFIN).
  470. // We know that the server completed the handshake as soon as we receive a forward-secure packet.
  471. if s.perspective == protocol.PerspectiveClient {
  472. if !s.receivedFirstForwardSecurePacket && packet.encryptionLevel == protocol.Encryption1RTT {
  473. s.receivedFirstForwardSecurePacket = true
  474. s.sentPacketHandler.SetHandshakeComplete()
  475. }
  476. }
  477. // If this is a Retry packet, there's no need to send an ACK.
  478. // The session will be closed and recreated as soon as the crypto setup processed the HRR.
  479. if packet.hdr.Type != protocol.PacketTypeRetry {
  480. isRetransmittable := ackhandler.HasRetransmittableFrames(packet.frames)
  481. if err := s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, rcvTime, isRetransmittable); err != nil {
  482. return err
  483. }
  484. }
  485. return s.handleFrames(packet.frames, packet.packetNumber, packet.encryptionLevel)
  486. }
  487. func (s *session) handleFrames(fs []wire.Frame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  488. for _, ff := range fs {
  489. var err error
  490. wire.LogFrame(s.logger, ff, false)
  491. switch frame := ff.(type) {
  492. case *wire.CryptoFrame:
  493. err = s.handleCryptoFrame(frame, encLevel)
  494. case *wire.StreamFrame:
  495. err = s.handleStreamFrame(frame, encLevel)
  496. case *wire.AckFrame:
  497. err = s.handleAckFrame(frame, pn, encLevel)
  498. case *wire.ConnectionCloseFrame:
  499. s.closeRemote(qerr.Error(frame.ErrorCode, frame.ReasonPhrase))
  500. case *wire.ResetStreamFrame:
  501. err = s.handleResetStreamFrame(frame)
  502. case *wire.MaxDataFrame:
  503. s.handleMaxDataFrame(frame)
  504. case *wire.MaxStreamDataFrame:
  505. err = s.handleMaxStreamDataFrame(frame)
  506. case *wire.MaxStreamsFrame:
  507. err = s.handleMaxStreamsFrame(frame)
  508. case *wire.DataBlockedFrame:
  509. case *wire.StreamDataBlockedFrame:
  510. case *wire.StreamsBlockedFrame:
  511. case *wire.StopSendingFrame:
  512. err = s.handleStopSendingFrame(frame)
  513. case *wire.PingFrame:
  514. case *wire.PathChallengeFrame:
  515. s.handlePathChallengeFrame(frame)
  516. case *wire.PathResponseFrame:
  517. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  518. err = errors.New("unexpected PATH_RESPONSE frame")
  519. case *wire.NewTokenFrame:
  520. case *wire.NewConnectionIDFrame:
  521. case *wire.RetireConnectionIDFrame:
  522. // since we don't send new connection IDs, we don't expect retirements
  523. err = errors.New("unexpected RETIRE_CONNECTION_ID frame")
  524. default:
  525. return errors.New("Session BUG: unexpected frame type")
  526. }
  527. if err != nil {
  528. return err
  529. }
  530. }
  531. return nil
  532. }
  533. // handlePacket is called by the server with a new packet
  534. func (s *session) handlePacket(p *receivedPacket) {
  535. if s.closed.Get() {
  536. s.handlePacketAfterClosed(p)
  537. }
  538. // Discard packets once the amount of queued packets is larger than
  539. // the channel size, protocol.MaxSessionUnprocessedPackets
  540. select {
  541. case s.receivedPackets <- p:
  542. default:
  543. }
  544. }
  545. func (s *session) handlePacketAfterClosed(p *receivedPacket) {
  546. s.packetsReceivedAfterClose++
  547. if s.connectionClosePacket == nil {
  548. return
  549. }
  550. // exponential backoff
  551. // only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
  552. for n := s.packetsReceivedAfterClose; n > 1; n = n / 2 {
  553. if n%2 != 0 {
  554. return
  555. }
  556. }
  557. s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.packetsReceivedAfterClose)
  558. if err := s.conn.Write(s.connectionClosePacket.raw); err != nil {
  559. s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err)
  560. }
  561. }
  562. func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
  563. encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel)
  564. if err != nil {
  565. return err
  566. }
  567. if encLevelChanged {
  568. s.tryDecryptingQueuedPackets()
  569. }
  570. return nil
  571. }
  572. func (s *session) handleStreamFrame(frame *wire.StreamFrame, encLevel protocol.EncryptionLevel) error {
  573. if encLevel < protocol.Encryption1RTT {
  574. return qerr.Error(qerr.UnencryptedStreamData, fmt.Sprintf("received unencrypted stream data on stream %d", frame.StreamID))
  575. }
  576. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  577. if err != nil {
  578. return err
  579. }
  580. if str == nil {
  581. // Stream is closed and already garbage collected
  582. // ignore this StreamFrame
  583. return nil
  584. }
  585. return str.handleStreamFrame(frame)
  586. }
  587. func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  588. s.connFlowController.UpdateSendWindow(frame.ByteOffset)
  589. }
  590. func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  591. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  592. if err != nil {
  593. return err
  594. }
  595. if str == nil {
  596. // stream is closed and already garbage collected
  597. return nil
  598. }
  599. str.handleMaxStreamDataFrame(frame)
  600. return nil
  601. }
  602. func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) error {
  603. return s.streamsMap.HandleMaxStreamsFrame(frame)
  604. }
  605. func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  606. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  607. if err != nil {
  608. return err
  609. }
  610. if str == nil {
  611. // stream is closed and already garbage collected
  612. return nil
  613. }
  614. return str.handleResetStreamFrame(frame)
  615. }
  616. func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  617. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  618. if err != nil {
  619. return err
  620. }
  621. if str == nil {
  622. // stream is closed and already garbage collected
  623. return nil
  624. }
  625. str.handleStopSendingFrame(frame)
  626. return nil
  627. }
  628. func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  629. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  630. }
  631. func (s *session) handleAckFrame(frame *wire.AckFrame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  632. if err := s.sentPacketHandler.ReceivedAck(frame, pn, encLevel, s.lastNetworkActivityTime); err != nil {
  633. return err
  634. }
  635. s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
  636. return nil
  637. }
  638. // closeLocal closes the session and send a CONNECTION_CLOSE containing the error
  639. func (s *session) closeLocal(e error) {
  640. s.closeOnce.Do(func() {
  641. s.sessionRunner.retireConnectionID(s.srcConnID)
  642. s.closeChan <- closeError{err: e, sendClose: true, remote: false}
  643. })
  644. }
  645. // destroy closes the session without sending the error on the wire
  646. func (s *session) destroy(e error) {
  647. s.closeOnce.Do(func() {
  648. s.sessionRunner.removeConnectionID(s.srcConnID)
  649. s.closeChan <- closeError{err: e, sendClose: false, remote: false}
  650. })
  651. }
  652. // closeForRecreating closes the session in order to recreate it immediately afterwards
  653. // It returns the first packet number that should be used in the new session.
  654. func (s *session) closeForRecreating() protocol.PacketNumber {
  655. s.destroy(errCloseForRecreating)
  656. nextPN, _ := s.sentPacketHandler.PeekPacketNumber()
  657. return nextPN
  658. }
  659. func (s *session) closeRemote(e error) {
  660. s.closeOnce.Do(func() {
  661. s.sessionRunner.removeConnectionID(s.srcConnID)
  662. s.closeChan <- closeError{err: e, remote: true}
  663. })
  664. }
  665. // Close the connection. It sends a qerr.PeerGoingAway.
  666. // It waits until the run loop has stopped before returning
  667. func (s *session) Close() error {
  668. s.closeLocal(nil)
  669. <-s.ctx.Done()
  670. return nil
  671. }
  672. func (s *session) CloseWithError(code protocol.ApplicationErrorCode, e error) error {
  673. s.closeLocal(qerr.Error(qerr.ErrorCode(code), e.Error()))
  674. <-s.ctx.Done()
  675. return nil
  676. }
  677. func (s *session) handleCloseError(closeErr closeError) error {
  678. if closeErr.err == nil {
  679. closeErr.err = qerr.PeerGoingAway
  680. }
  681. var quicErr *qerr.QuicError
  682. var ok bool
  683. if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok {
  684. quicErr = qerr.ToQuicError(closeErr.err)
  685. }
  686. // Don't log 'normal' reasons
  687. if quicErr.ErrorCode == qerr.PeerGoingAway || quicErr.ErrorCode == qerr.NetworkIdleTimeout {
  688. s.logger.Infof("Closing connection %s.", s.srcConnID)
  689. } else {
  690. s.logger.Errorf("Closing session with error: %s", closeErr.err.Error())
  691. }
  692. s.streamsMap.CloseWithError(quicErr)
  693. if !closeErr.sendClose {
  694. return nil
  695. }
  696. // If this is a remote close we're done here
  697. if closeErr.remote {
  698. return nil
  699. }
  700. if quicErr.ErrorCode == qerr.DecryptionFailure {
  701. // TODO(#943): send a stateless reset
  702. return nil
  703. }
  704. return s.sendConnectionClose(quicErr)
  705. }
  706. func (s *session) processTransportParameters(params *handshake.TransportParameters) {
  707. s.peerParams = params
  708. s.streamsMap.UpdateLimits(params)
  709. s.packer.HandleTransportParameters(params)
  710. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  711. // the crypto stream is the only open stream at this moment
  712. // so we don't need to update stream flow control windows
  713. }
  714. func (s *session) sendPackets() error {
  715. s.pacingDeadline = time.Time{}
  716. sendMode := s.sentPacketHandler.SendMode()
  717. if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
  718. return nil
  719. }
  720. numPackets := s.sentPacketHandler.ShouldSendNumPackets()
  721. var numPacketsSent int
  722. sendLoop:
  723. for {
  724. switch sendMode {
  725. case ackhandler.SendNone:
  726. break sendLoop
  727. case ackhandler.SendAck:
  728. // If we already sent packets, and the send mode switches to SendAck,
  729. // we've just become congestion limited.
  730. // There's no need to try to send an ACK at this moment.
  731. if numPacketsSent > 0 {
  732. return nil
  733. }
  734. // We can at most send a single ACK only packet.
  735. // There will only be a new ACK after receiving new packets.
  736. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
  737. return s.maybeSendAckOnlyPacket()
  738. case ackhandler.SendTLP, ackhandler.SendRTO:
  739. if err := s.sendProbePacket(); err != nil {
  740. return err
  741. }
  742. numPacketsSent++
  743. case ackhandler.SendRetransmission:
  744. sentPacket, err := s.maybeSendRetransmission()
  745. if err != nil {
  746. return err
  747. }
  748. if sentPacket {
  749. numPacketsSent++
  750. // This can happen if a retransmission queued, but it wasn't necessary to send it.
  751. // e.g. when an Initial is queued, but we already received a packet from the server.
  752. }
  753. case ackhandler.SendAny:
  754. sentPacket, err := s.sendPacket()
  755. if err != nil {
  756. return err
  757. }
  758. if !sentPacket {
  759. break sendLoop
  760. }
  761. numPacketsSent++
  762. default:
  763. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  764. }
  765. if numPacketsSent >= numPackets {
  766. break
  767. }
  768. sendMode = s.sentPacketHandler.SendMode()
  769. }
  770. // Only start the pacing timer if we sent as many packets as we were allowed.
  771. // There will probably be more to send when calling sendPacket again.
  772. if numPacketsSent == numPackets {
  773. s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  774. }
  775. return nil
  776. }
  777. func (s *session) maybeSendAckOnlyPacket() error {
  778. packet, err := s.packer.MaybePackAckPacket()
  779. if err != nil {
  780. return err
  781. }
  782. if packet == nil {
  783. return nil
  784. }
  785. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  786. return s.sendPackedPacket(packet)
  787. }
  788. // maybeSendRetransmission sends retransmissions for at most one packet.
  789. // It takes care that Initials aren't retransmitted, if a packet from the server was already received.
  790. func (s *session) maybeSendRetransmission() (bool, error) {
  791. var retransmitPacket *ackhandler.Packet
  792. for {
  793. retransmitPacket = s.sentPacketHandler.DequeuePacketForRetransmission()
  794. if retransmitPacket == nil {
  795. return false, nil
  796. }
  797. // Don't retransmit Initial packets if we already received a response.
  798. // An Initial might have been retransmitted multiple times before we receive a response.
  799. // As soon as we receive one response, we don't need to send any more Initials.
  800. if s.perspective == protocol.PerspectiveClient && s.receivedFirstPacket && retransmitPacket.PacketType == protocol.PacketTypeInitial {
  801. s.logger.Debugf("Skipping retransmission of packet %d. Already received a response to an Initial.", retransmitPacket.PacketNumber)
  802. continue
  803. }
  804. break
  805. }
  806. s.logger.Debugf("Dequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
  807. packets, err := s.packer.PackRetransmission(retransmitPacket)
  808. if err != nil {
  809. return false, err
  810. }
  811. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  812. for i, packet := range packets {
  813. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  814. }
  815. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
  816. for _, packet := range packets {
  817. if err := s.sendPackedPacket(packet); err != nil {
  818. return false, err
  819. }
  820. }
  821. return true, nil
  822. }
  823. func (s *session) sendProbePacket() error {
  824. p, err := s.sentPacketHandler.DequeueProbePacket()
  825. if err != nil {
  826. return err
  827. }
  828. s.logger.Debugf("Sending a retransmission for %#x as a probe packet.", p.PacketNumber)
  829. packets, err := s.packer.PackRetransmission(p)
  830. if err != nil {
  831. return err
  832. }
  833. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  834. for i, packet := range packets {
  835. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  836. }
  837. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber)
  838. for _, packet := range packets {
  839. if err := s.sendPackedPacket(packet); err != nil {
  840. return err
  841. }
  842. }
  843. return nil
  844. }
  845. func (s *session) sendPacket() (bool, error) {
  846. if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
  847. s.framer.QueueControlFrame(&wire.DataBlockedFrame{DataLimit: offset})
  848. }
  849. s.windowUpdateQueue.QueueAll()
  850. packet, err := s.packer.PackPacket()
  851. if err != nil || packet == nil {
  852. return false, err
  853. }
  854. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  855. if err := s.sendPackedPacket(packet); err != nil {
  856. return false, err
  857. }
  858. return true, nil
  859. }
  860. func (s *session) sendPackedPacket(packet *packedPacket) error {
  861. defer packet.buffer.Release()
  862. s.logPacket(packet)
  863. return s.conn.Write(packet.raw)
  864. }
  865. func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
  866. packet, err := s.packer.PackConnectionClose(&wire.ConnectionCloseFrame{
  867. ErrorCode: quicErr.ErrorCode,
  868. ReasonPhrase: quicErr.ErrorMessage,
  869. })
  870. if err != nil {
  871. return err
  872. }
  873. s.connectionClosePacket = packet
  874. s.logPacket(packet)
  875. return s.conn.Write(packet.raw)
  876. }
  877. func (s *session) logPacket(packet *packedPacket) {
  878. if !s.logger.Debug() {
  879. // We don't need to allocate the slices for calling the format functions
  880. return
  881. }
  882. s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, len(packet.raw), s.srcConnID, packet.EncryptionLevel())
  883. packet.header.Log(s.logger)
  884. for _, frame := range packet.frames {
  885. wire.LogFrame(s.logger, frame, true)
  886. }
  887. }
  888. // GetOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed.
  889. // It is *only* needed for gQUIC's H2.
  890. // It will be removed as soon as gQUIC moves towards the IETF H2/QUIC stream mapping.
  891. func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) {
  892. str, err := s.streamsMap.GetOrOpenSendStream(id)
  893. if str != nil {
  894. if bstr, ok := str.(Stream); ok {
  895. return bstr, err
  896. }
  897. return nil, fmt.Errorf("Stream %d is not a bidirectional stream", id)
  898. }
  899. // make sure to return an actual nil value here, not an Stream with value nil
  900. return nil, err
  901. }
  902. // AcceptStream returns the next stream openend by the peer
  903. func (s *session) AcceptStream() (Stream, error) {
  904. return s.streamsMap.AcceptStream()
  905. }
  906. func (s *session) AcceptUniStream() (ReceiveStream, error) {
  907. return s.streamsMap.AcceptUniStream()
  908. }
  909. // OpenStream opens a stream
  910. func (s *session) OpenStream() (Stream, error) {
  911. return s.streamsMap.OpenStream()
  912. }
  913. func (s *session) OpenStreamSync() (Stream, error) {
  914. return s.streamsMap.OpenStreamSync()
  915. }
  916. func (s *session) OpenUniStream() (SendStream, error) {
  917. return s.streamsMap.OpenUniStream()
  918. }
  919. func (s *session) OpenUniStreamSync() (SendStream, error) {
  920. return s.streamsMap.OpenUniStreamSync()
  921. }
  922. func (s *session) newStream(id protocol.StreamID) streamI {
  923. flowController := s.newFlowController(id)
  924. return newStream(id, s, flowController, s.version)
  925. }
  926. func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  927. var initialSendWindow protocol.ByteCount
  928. if s.peerParams != nil {
  929. if id.Type() == protocol.StreamTypeUni {
  930. initialSendWindow = s.peerParams.InitialMaxStreamDataUni
  931. } else {
  932. if id.InitiatedBy() == s.perspective {
  933. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal
  934. } else {
  935. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote
  936. }
  937. }
  938. }
  939. return flowcontrol.NewStreamFlowController(
  940. id,
  941. s.connFlowController,
  942. protocol.InitialMaxStreamData,
  943. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  944. initialSendWindow,
  945. s.onHasStreamWindowUpdate,
  946. s.rttStats,
  947. s.logger,
  948. )
  949. }
  950. // scheduleSending signals that we have data for sending
  951. func (s *session) scheduleSending() {
  952. select {
  953. case s.sendingScheduled <- struct{}{}:
  954. default:
  955. }
  956. }
  957. func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket) {
  958. if s.handshakeComplete {
  959. s.logger.Debugf("Received undecryptable packet from %s after the handshake (%d bytes)", p.remoteAddr.String(), len(p.data))
  960. return
  961. }
  962. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  963. s.logger.Infof("Dropping undecrytable packet (%d bytes). Undecryptable packet queue full.", len(p.data))
  964. return
  965. }
  966. s.logger.Infof("Queueing packet (%d bytes) for later decryption", len(p.data))
  967. s.undecryptablePackets = append(s.undecryptablePackets, p)
  968. }
  969. func (s *session) tryDecryptingQueuedPackets() {
  970. for _, p := range s.undecryptablePackets {
  971. s.handlePacket(p)
  972. }
  973. s.undecryptablePackets = s.undecryptablePackets[:0]
  974. }
  975. func (s *session) queueControlFrame(f wire.Frame) {
  976. s.framer.QueueControlFrame(f)
  977. s.scheduleSending()
  978. }
  979. func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
  980. s.windowUpdateQueue.AddStream(id)
  981. s.scheduleSending()
  982. }
  983. func (s *session) onHasConnectionWindowUpdate() {
  984. s.windowUpdateQueue.AddConnection()
  985. s.scheduleSending()
  986. }
  987. func (s *session) onHasStreamData(id protocol.StreamID) {
  988. s.framer.AddActiveStream(id)
  989. s.scheduleSending()
  990. }
  991. func (s *session) onStreamCompleted(id protocol.StreamID) {
  992. if err := s.streamsMap.DeleteStream(id); err != nil {
  993. s.closeLocal(err)
  994. }
  995. }
  996. func (s *session) LocalAddr() net.Addr {
  997. return s.conn.LocalAddr()
  998. }
  999. func (s *session) RemoteAddr() net.Addr {
  1000. return s.conn.RemoteAddr()
  1001. }
  1002. func (s *session) GetVersion() protocol.VersionNumber {
  1003. return s.version
  1004. }