receive_stream_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. package quic
  2. import (
  3. "errors"
  4. "io"
  5. "runtime"
  6. "time"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/mocks"
  9. "github.com/lucas-clemente/quic-go/internal/protocol"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. . "github.com/onsi/ginkgo"
  12. . "github.com/onsi/gomega"
  13. "github.com/onsi/gomega/gbytes"
  14. )
  15. var _ = Describe("Receive Stream", func() {
  16. const streamID protocol.StreamID = 1337
  17. var (
  18. str *receiveStream
  19. strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
  20. mockFC *mocks.MockStreamFlowController
  21. mockSender *MockStreamSender
  22. )
  23. BeforeEach(func() {
  24. mockSender = NewMockStreamSender(mockCtrl)
  25. mockFC = mocks.NewMockStreamFlowController(mockCtrl)
  26. str = newReceiveStream(streamID, mockSender, mockFC, versionIETFFrames)
  27. timeout := scaleDuration(250 * time.Millisecond)
  28. strWithTimeout = gbytes.TimeoutReader(str, timeout)
  29. })
  30. It("gets stream id", func() {
  31. Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
  32. })
  33. Context("reading", func() {
  34. It("reads a single STREAM frame", func() {
  35. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  36. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  37. mockFC.EXPECT().MaybeQueueWindowUpdate()
  38. frame := wire.StreamFrame{
  39. Offset: 0,
  40. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  41. }
  42. err := str.handleStreamFrame(&frame)
  43. Expect(err).ToNot(HaveOccurred())
  44. b := make([]byte, 4)
  45. n, err := strWithTimeout.Read(b)
  46. Expect(err).ToNot(HaveOccurred())
  47. Expect(n).To(Equal(4))
  48. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  49. })
  50. It("reads a single STREAM frame in multiple goes", func() {
  51. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  52. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  53. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  54. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  55. frame := wire.StreamFrame{
  56. Offset: 0,
  57. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  58. }
  59. err := str.handleStreamFrame(&frame)
  60. Expect(err).ToNot(HaveOccurred())
  61. b := make([]byte, 2)
  62. n, err := strWithTimeout.Read(b)
  63. Expect(err).ToNot(HaveOccurred())
  64. Expect(n).To(Equal(2))
  65. Expect(b).To(Equal([]byte{0xDE, 0xAD}))
  66. n, err = strWithTimeout.Read(b)
  67. Expect(err).ToNot(HaveOccurred())
  68. Expect(n).To(Equal(2))
  69. Expect(b).To(Equal([]byte{0xBE, 0xEF}))
  70. })
  71. It("reads all data available", func() {
  72. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  73. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  74. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  75. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  76. frame1 := wire.StreamFrame{
  77. Offset: 0,
  78. Data: []byte{0xDE, 0xAD},
  79. }
  80. frame2 := wire.StreamFrame{
  81. Offset: 2,
  82. Data: []byte{0xBE, 0xEF},
  83. }
  84. err := str.handleStreamFrame(&frame1)
  85. Expect(err).ToNot(HaveOccurred())
  86. err = str.handleStreamFrame(&frame2)
  87. Expect(err).ToNot(HaveOccurred())
  88. b := make([]byte, 6)
  89. n, err := strWithTimeout.Read(b)
  90. Expect(err).ToNot(HaveOccurred())
  91. Expect(n).To(Equal(4))
  92. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
  93. })
  94. It("assembles multiple STREAM frames", func() {
  95. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  96. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  97. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  98. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  99. frame1 := wire.StreamFrame{
  100. Offset: 0,
  101. Data: []byte{0xDE, 0xAD},
  102. }
  103. frame2 := wire.StreamFrame{
  104. Offset: 2,
  105. Data: []byte{0xBE, 0xEF},
  106. }
  107. err := str.handleStreamFrame(&frame1)
  108. Expect(err).ToNot(HaveOccurred())
  109. err = str.handleStreamFrame(&frame2)
  110. Expect(err).ToNot(HaveOccurred())
  111. b := make([]byte, 4)
  112. n, err := strWithTimeout.Read(b)
  113. Expect(err).ToNot(HaveOccurred())
  114. Expect(n).To(Equal(4))
  115. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  116. })
  117. It("waits until data is available", func() {
  118. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  119. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  120. mockFC.EXPECT().MaybeQueueWindowUpdate()
  121. go func() {
  122. defer GinkgoRecover()
  123. frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
  124. time.Sleep(10 * time.Millisecond)
  125. err := str.handleStreamFrame(&frame)
  126. Expect(err).ToNot(HaveOccurred())
  127. }()
  128. b := make([]byte, 2)
  129. n, err := strWithTimeout.Read(b)
  130. Expect(err).ToNot(HaveOccurred())
  131. Expect(n).To(Equal(2))
  132. })
  133. It("handles STREAM frames in wrong order", func() {
  134. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  135. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  136. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  137. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  138. frame1 := wire.StreamFrame{
  139. Offset: 2,
  140. Data: []byte{0xBE, 0xEF},
  141. }
  142. frame2 := wire.StreamFrame{
  143. Offset: 0,
  144. Data: []byte{0xDE, 0xAD},
  145. }
  146. err := str.handleStreamFrame(&frame1)
  147. Expect(err).ToNot(HaveOccurred())
  148. err = str.handleStreamFrame(&frame2)
  149. Expect(err).ToNot(HaveOccurred())
  150. b := make([]byte, 4)
  151. n, err := strWithTimeout.Read(b)
  152. Expect(err).ToNot(HaveOccurred())
  153. Expect(n).To(Equal(4))
  154. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  155. })
  156. It("ignores duplicate STREAM frames", func() {
  157. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  158. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  159. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  160. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  161. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  162. frame1 := wire.StreamFrame{
  163. Offset: 0,
  164. Data: []byte{0xDE, 0xAD},
  165. }
  166. frame2 := wire.StreamFrame{
  167. Offset: 0,
  168. Data: []byte{0x13, 0x37},
  169. }
  170. frame3 := wire.StreamFrame{
  171. Offset: 2,
  172. Data: []byte{0xBE, 0xEF},
  173. }
  174. err := str.handleStreamFrame(&frame1)
  175. Expect(err).ToNot(HaveOccurred())
  176. err = str.handleStreamFrame(&frame2)
  177. Expect(err).ToNot(HaveOccurred())
  178. err = str.handleStreamFrame(&frame3)
  179. Expect(err).ToNot(HaveOccurred())
  180. b := make([]byte, 4)
  181. n, err := strWithTimeout.Read(b)
  182. Expect(err).ToNot(HaveOccurred())
  183. Expect(n).To(Equal(4))
  184. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  185. })
  186. It("doesn't rejects a STREAM frames with an overlapping data range", func() {
  187. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  188. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
  189. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  190. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  191. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  192. frame1 := wire.StreamFrame{
  193. Offset: 0,
  194. Data: []byte("foob"),
  195. }
  196. frame2 := wire.StreamFrame{
  197. Offset: 2,
  198. Data: []byte("obar"),
  199. }
  200. err := str.handleStreamFrame(&frame1)
  201. Expect(err).ToNot(HaveOccurred())
  202. err = str.handleStreamFrame(&frame2)
  203. Expect(err).ToNot(HaveOccurred())
  204. b := make([]byte, 6)
  205. n, err := strWithTimeout.Read(b)
  206. Expect(err).ToNot(HaveOccurred())
  207. Expect(n).To(Equal(6))
  208. Expect(b).To(Equal([]byte("foobar")))
  209. })
  210. Context("deadlines", func() {
  211. It("the deadline error has the right net.Error properties", func() {
  212. Expect(errDeadline.Temporary()).To(BeTrue())
  213. Expect(errDeadline.Timeout()).To(BeTrue())
  214. Expect(errDeadline).To(MatchError("deadline exceeded"))
  215. })
  216. It("returns an error when Read is called after the deadline", func() {
  217. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes()
  218. f := &wire.StreamFrame{Data: []byte("foobar")}
  219. err := str.handleStreamFrame(f)
  220. Expect(err).ToNot(HaveOccurred())
  221. str.SetReadDeadline(time.Now().Add(-time.Second))
  222. b := make([]byte, 6)
  223. n, err := strWithTimeout.Read(b)
  224. Expect(err).To(MatchError(errDeadline))
  225. Expect(n).To(BeZero())
  226. })
  227. It("unblocks after the deadline", func() {
  228. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  229. str.SetReadDeadline(deadline)
  230. b := make([]byte, 6)
  231. n, err := strWithTimeout.Read(b)
  232. Expect(err).To(MatchError(errDeadline))
  233. Expect(n).To(BeZero())
  234. Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(10*time.Millisecond)))
  235. })
  236. It("doesn't unblock if the deadline is changed before the first one expires", func() {
  237. deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  238. deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
  239. str.SetReadDeadline(deadline1)
  240. go func() {
  241. defer GinkgoRecover()
  242. time.Sleep(scaleDuration(20 * time.Millisecond))
  243. str.SetReadDeadline(deadline2)
  244. // make sure that this was actually execute before the deadline expires
  245. Expect(time.Now()).To(BeTemporally("<", deadline1))
  246. }()
  247. runtime.Gosched()
  248. b := make([]byte, 10)
  249. n, err := strWithTimeout.Read(b)
  250. Expect(err).To(MatchError(errDeadline))
  251. Expect(n).To(BeZero())
  252. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
  253. })
  254. It("unblocks earlier, when a new deadline is set", func() {
  255. deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
  256. deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  257. go func() {
  258. defer GinkgoRecover()
  259. time.Sleep(scaleDuration(10 * time.Millisecond))
  260. str.SetReadDeadline(deadline2)
  261. // make sure that this was actually execute before the deadline expires
  262. Expect(time.Now()).To(BeTemporally("<", deadline2))
  263. }()
  264. str.SetReadDeadline(deadline1)
  265. runtime.Gosched()
  266. b := make([]byte, 10)
  267. _, err := strWithTimeout.Read(b)
  268. Expect(err).To(MatchError(errDeadline))
  269. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond)))
  270. })
  271. })
  272. Context("closing", func() {
  273. Context("with FIN bit", func() {
  274. It("returns EOFs", func() {
  275. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
  276. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  277. mockFC.EXPECT().MaybeQueueWindowUpdate()
  278. str.handleStreamFrame(&wire.StreamFrame{
  279. Offset: 0,
  280. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  281. FinBit: true,
  282. })
  283. mockSender.EXPECT().onStreamCompleted(streamID)
  284. b := make([]byte, 4)
  285. n, err := strWithTimeout.Read(b)
  286. Expect(err).To(MatchError(io.EOF))
  287. Expect(n).To(Equal(4))
  288. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  289. n, err = strWithTimeout.Read(b)
  290. Expect(n).To(BeZero())
  291. Expect(err).To(MatchError(io.EOF))
  292. })
  293. It("handles out-of-order frames", func() {
  294. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  295. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
  296. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  297. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  298. frame1 := wire.StreamFrame{
  299. Offset: 2,
  300. Data: []byte{0xBE, 0xEF},
  301. FinBit: true,
  302. }
  303. frame2 := wire.StreamFrame{
  304. Offset: 0,
  305. Data: []byte{0xDE, 0xAD},
  306. }
  307. err := str.handleStreamFrame(&frame1)
  308. Expect(err).ToNot(HaveOccurred())
  309. err = str.handleStreamFrame(&frame2)
  310. Expect(err).ToNot(HaveOccurred())
  311. mockSender.EXPECT().onStreamCompleted(streamID)
  312. b := make([]byte, 4)
  313. n, err := strWithTimeout.Read(b)
  314. Expect(err).To(MatchError(io.EOF))
  315. Expect(n).To(Equal(4))
  316. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  317. n, err = strWithTimeout.Read(b)
  318. Expect(n).To(BeZero())
  319. Expect(err).To(MatchError(io.EOF))
  320. })
  321. It("returns EOFs with partial read", func() {
  322. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
  323. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  324. mockFC.EXPECT().MaybeQueueWindowUpdate()
  325. err := str.handleStreamFrame(&wire.StreamFrame{
  326. Offset: 0,
  327. Data: []byte{0xde, 0xad},
  328. FinBit: true,
  329. })
  330. Expect(err).ToNot(HaveOccurred())
  331. mockSender.EXPECT().onStreamCompleted(streamID)
  332. b := make([]byte, 4)
  333. n, err := strWithTimeout.Read(b)
  334. Expect(err).To(MatchError(io.EOF))
  335. Expect(n).To(Equal(2))
  336. Expect(b[:n]).To(Equal([]byte{0xde, 0xad}))
  337. })
  338. It("handles immediate FINs", func() {
  339. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
  340. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
  341. mockFC.EXPECT().MaybeQueueWindowUpdate()
  342. err := str.handleStreamFrame(&wire.StreamFrame{
  343. Offset: 0,
  344. FinBit: true,
  345. })
  346. Expect(err).ToNot(HaveOccurred())
  347. mockSender.EXPECT().onStreamCompleted(streamID)
  348. b := make([]byte, 4)
  349. n, err := strWithTimeout.Read(b)
  350. Expect(n).To(BeZero())
  351. Expect(err).To(MatchError(io.EOF))
  352. })
  353. })
  354. It("closes when CloseRemote is called", func() {
  355. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
  356. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
  357. mockFC.EXPECT().MaybeQueueWindowUpdate()
  358. str.CloseRemote(0)
  359. mockSender.EXPECT().onStreamCompleted(streamID)
  360. b := make([]byte, 8)
  361. n, err := strWithTimeout.Read(b)
  362. Expect(n).To(BeZero())
  363. Expect(err).To(MatchError(io.EOF))
  364. })
  365. })
  366. Context("closing for shutdown", func() {
  367. testErr := errors.New("test error")
  368. It("immediately returns all reads", func() {
  369. done := make(chan struct{})
  370. b := make([]byte, 4)
  371. go func() {
  372. defer GinkgoRecover()
  373. n, err := strWithTimeout.Read(b)
  374. Expect(n).To(BeZero())
  375. Expect(err).To(MatchError(testErr))
  376. close(done)
  377. }()
  378. Consistently(done).ShouldNot(BeClosed())
  379. str.closeForShutdown(testErr)
  380. Eventually(done).Should(BeClosed())
  381. })
  382. It("errors for all following reads", func() {
  383. str.closeForShutdown(testErr)
  384. b := make([]byte, 1)
  385. n, err := strWithTimeout.Read(b)
  386. Expect(n).To(BeZero())
  387. Expect(err).To(MatchError(testErr))
  388. })
  389. })
  390. })
  391. Context("stream cancelations", func() {
  392. Context("canceling read", func() {
  393. It("unblocks Read", func() {
  394. mockSender.EXPECT().queueControlFrame(gomock.Any())
  395. done := make(chan struct{})
  396. go func() {
  397. defer GinkgoRecover()
  398. _, err := strWithTimeout.Read([]byte{0})
  399. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  400. close(done)
  401. }()
  402. Consistently(done).ShouldNot(BeClosed())
  403. err := str.CancelRead(1234)
  404. Expect(err).ToNot(HaveOccurred())
  405. Eventually(done).Should(BeClosed())
  406. })
  407. It("doesn't allow further calls to Read", func() {
  408. mockSender.EXPECT().queueControlFrame(gomock.Any())
  409. err := str.CancelRead(1234)
  410. Expect(err).ToNot(HaveOccurred())
  411. _, err = strWithTimeout.Read([]byte{0})
  412. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  413. })
  414. It("does nothing when CancelRead is called twice", func() {
  415. mockSender.EXPECT().queueControlFrame(gomock.Any())
  416. err := str.CancelRead(1234)
  417. Expect(err).ToNot(HaveOccurred())
  418. err = str.CancelRead(2345)
  419. Expect(err).ToNot(HaveOccurred())
  420. _, err = strWithTimeout.Read([]byte{0})
  421. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  422. })
  423. It("doesn't send a RST_STREAM frame, if the FIN was already read", func() {
  424. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
  425. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
  426. mockFC.EXPECT().MaybeQueueWindowUpdate()
  427. // no calls to mockSender.queueControlFrame
  428. err := str.handleStreamFrame(&wire.StreamFrame{
  429. StreamID: streamID,
  430. Data: []byte("foobar"),
  431. FinBit: true,
  432. })
  433. Expect(err).ToNot(HaveOccurred())
  434. mockSender.EXPECT().onStreamCompleted(streamID)
  435. _, err = strWithTimeout.Read(make([]byte, 100))
  436. Expect(err).To(MatchError(io.EOF))
  437. err = str.CancelRead(1234)
  438. Expect(err).ToNot(HaveOccurred())
  439. })
  440. It("queues a STOP_SENDING frame, for IETF QUIC", func() {
  441. str.version = versionIETFFrames
  442. mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{
  443. StreamID: streamID,
  444. ErrorCode: 1234,
  445. })
  446. err := str.CancelRead(1234)
  447. Expect(err).ToNot(HaveOccurred())
  448. })
  449. It("doesn't queue a STOP_SENDING frame, for gQUIC", func() {
  450. str.version = versionGQUICFrames
  451. // no calls to mockSender.queueControlFrame
  452. err := str.CancelRead(1234)
  453. Expect(err).ToNot(HaveOccurred())
  454. })
  455. })
  456. Context("receiving RST_STREAM frames", func() {
  457. rst := &wire.RstStreamFrame{
  458. StreamID: streamID,
  459. ByteOffset: 42,
  460. ErrorCode: 1234,
  461. }
  462. It("unblocks Read", func() {
  463. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
  464. done := make(chan struct{})
  465. go func() {
  466. defer GinkgoRecover()
  467. _, err := strWithTimeout.Read([]byte{0})
  468. Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
  469. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  470. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  471. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
  472. close(done)
  473. }()
  474. Consistently(done).ShouldNot(BeClosed())
  475. mockSender.EXPECT().onStreamCompleted(streamID)
  476. str.handleRstStreamFrame(rst)
  477. Eventually(done).Should(BeClosed())
  478. })
  479. It("doesn't allow further calls to Read", func() {
  480. mockSender.EXPECT().onStreamCompleted(streamID)
  481. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
  482. err := str.handleRstStreamFrame(rst)
  483. Expect(err).ToNot(HaveOccurred())
  484. _, err = strWithTimeout.Read([]byte{0})
  485. Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
  486. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  487. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  488. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
  489. })
  490. It("errors when receiving a RST_STREAM with an inconsistent offset", func() {
  491. testErr := errors.New("already received a different final offset before")
  492. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Return(testErr)
  493. err := str.handleRstStreamFrame(rst)
  494. Expect(err).To(MatchError(testErr))
  495. })
  496. It("ignores duplicate RST_STREAM frames", func() {
  497. mockSender.EXPECT().onStreamCompleted(streamID)
  498. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
  499. err := str.handleRstStreamFrame(rst)
  500. Expect(err).ToNot(HaveOccurred())
  501. err = str.handleRstStreamFrame(rst)
  502. Expect(err).ToNot(HaveOccurred())
  503. })
  504. It("doesn't do anyting when it was closed for shutdown", func() {
  505. str.closeForShutdown(nil)
  506. err := str.handleRstStreamFrame(rst)
  507. Expect(err).ToNot(HaveOccurred())
  508. })
  509. Context("for gQUIC", func() {
  510. BeforeEach(func() {
  511. str.version = versionGQUICFrames
  512. })
  513. It("unblocks Read when receiving a RST_STREAM frame with non-zero error code", func() {
  514. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true)
  515. readReturned := make(chan struct{})
  516. go func() {
  517. defer GinkgoRecover()
  518. _, err := strWithTimeout.Read([]byte{0})
  519. Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
  520. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  521. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  522. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
  523. close(readReturned)
  524. }()
  525. Consistently(readReturned).ShouldNot(BeClosed())
  526. mockSender.EXPECT().onStreamCompleted(streamID)
  527. err := str.handleRstStreamFrame(rst)
  528. Expect(err).ToNot(HaveOccurred())
  529. Eventually(readReturned).Should(BeClosed())
  530. })
  531. It("continues reading until the end when receiving a RST_STREAM frame with error code 0", func() {
  532. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true).Times(2)
  533. gomock.InOrder(
  534. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)),
  535. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)),
  536. mockSender.EXPECT().onStreamCompleted(streamID),
  537. )
  538. mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2)
  539. readReturned := make(chan struct{})
  540. go func() {
  541. defer GinkgoRecover()
  542. n, err := strWithTimeout.Read(make([]byte, 4))
  543. Expect(err).ToNot(HaveOccurred())
  544. Expect(n).To(Equal(4))
  545. n, err = strWithTimeout.Read(make([]byte, 4))
  546. Expect(err).To(MatchError(io.EOF))
  547. Expect(n).To(Equal(2))
  548. close(readReturned)
  549. }()
  550. Consistently(readReturned).ShouldNot(BeClosed())
  551. err := str.handleStreamFrame(&wire.StreamFrame{
  552. StreamID: streamID,
  553. Data: []byte("foobar"),
  554. FinBit: true,
  555. })
  556. Expect(err).ToNot(HaveOccurred())
  557. err = str.handleRstStreamFrame(&wire.RstStreamFrame{
  558. StreamID: streamID,
  559. ByteOffset: 6,
  560. ErrorCode: 0,
  561. })
  562. Expect(err).ToNot(HaveOccurred())
  563. Eventually(readReturned).Should(BeClosed())
  564. })
  565. })
  566. })
  567. })
  568. Context("flow control", func() {
  569. It("errors when a STREAM frame causes a flow control violation", func() {
  570. testErr := errors.New("flow control violation")
  571. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), false).Return(testErr)
  572. frame := wire.StreamFrame{
  573. Offset: 2,
  574. Data: []byte("foobar"),
  575. }
  576. err := str.handleStreamFrame(&frame)
  577. Expect(err).To(MatchError(testErr))
  578. })
  579. It("gets a window update", func() {
  580. mockFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x100))
  581. Expect(str.getWindowUpdate()).To(Equal(protocol.ByteCount(0x100)))
  582. })
  583. })
  584. })