send_stream_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. package quic
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "runtime"
  7. "time"
  8. "github.com/golang/mock/gomock"
  9. "github.com/lucas-clemente/quic-go/internal/mocks"
  10. "github.com/lucas-clemente/quic-go/internal/protocol"
  11. "github.com/lucas-clemente/quic-go/internal/wire"
  12. . "github.com/onsi/ginkgo"
  13. . "github.com/onsi/gomega"
  14. "github.com/onsi/gomega/gbytes"
  15. )
  16. var _ = Describe("Send Stream", func() {
  17. const streamID protocol.StreamID = 1337
  18. var (
  19. str *sendStream
  20. strWithTimeout io.Writer // str wrapped with gbytes.TimeoutWriter
  21. mockFC *mocks.MockStreamFlowController
  22. mockSender *MockStreamSender
  23. )
  24. BeforeEach(func() {
  25. mockSender = NewMockStreamSender(mockCtrl)
  26. mockFC = mocks.NewMockStreamFlowController(mockCtrl)
  27. str = newSendStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
  28. timeout := scaleDuration(250 * time.Millisecond)
  29. strWithTimeout = gbytes.TimeoutWriter(str, timeout)
  30. })
  31. waitForWrite := func() {
  32. EventuallyWithOffset(0, func() []byte {
  33. str.mutex.Lock()
  34. data := str.dataForWriting
  35. str.mutex.Unlock()
  36. return data
  37. }).ShouldNot(BeEmpty())
  38. }
  39. It("gets stream id", func() {
  40. Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
  41. })
  42. Context("writing", func() {
  43. It("writes and gets all data at once", func() {
  44. mockSender.EXPECT().onHasStreamData(streamID)
  45. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
  46. mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
  47. done := make(chan struct{})
  48. go func() {
  49. defer GinkgoRecover()
  50. n, err := strWithTimeout.Write([]byte("foobar"))
  51. Expect(err).ToNot(HaveOccurred())
  52. Expect(n).To(Equal(6))
  53. close(done)
  54. }()
  55. waitForWrite()
  56. f, _ := str.popStreamFrame(1000)
  57. Expect(f.Data).To(Equal([]byte("foobar")))
  58. Expect(f.FinBit).To(BeFalse())
  59. Expect(f.Offset).To(BeZero())
  60. Expect(f.DataLenPresent).To(BeTrue())
  61. Expect(str.writeOffset).To(Equal(protocol.ByteCount(6)))
  62. Expect(str.dataForWriting).To(BeNil())
  63. Eventually(done).Should(BeClosed())
  64. })
  65. It("writes and gets data in two turns", func() {
  66. mockSender.EXPECT().onHasStreamData(streamID)
  67. frameHeaderLen := protocol.ByteCount(4)
  68. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
  69. mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
  70. done := make(chan struct{})
  71. go func() {
  72. defer GinkgoRecover()
  73. n, err := strWithTimeout.Write([]byte("foobar"))
  74. Expect(err).ToNot(HaveOccurred())
  75. Expect(n).To(Equal(6))
  76. close(done)
  77. }()
  78. waitForWrite()
  79. f, _ := str.popStreamFrame(3 + frameHeaderLen)
  80. Expect(f.Data).To(Equal([]byte("foo")))
  81. Expect(f.FinBit).To(BeFalse())
  82. Expect(f.Offset).To(BeZero())
  83. Expect(f.DataLenPresent).To(BeTrue())
  84. f, _ = str.popStreamFrame(100)
  85. Expect(f.Data).To(Equal([]byte("bar")))
  86. Expect(f.FinBit).To(BeFalse())
  87. Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
  88. Expect(f.DataLenPresent).To(BeTrue())
  89. Expect(str.popStreamFrame(1000)).To(BeNil())
  90. Eventually(done).Should(BeClosed())
  91. })
  92. It("popStreamFrame returns nil if no data is available", func() {
  93. frame, hasMoreData := str.popStreamFrame(1000)
  94. Expect(frame).To(BeNil())
  95. Expect(hasMoreData).To(BeFalse())
  96. })
  97. It("says if it has more data for writing", func() {
  98. mockSender.EXPECT().onHasStreamData(streamID)
  99. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
  100. mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
  101. done := make(chan struct{})
  102. go func() {
  103. defer GinkgoRecover()
  104. n, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
  105. Expect(err).ToNot(HaveOccurred())
  106. Expect(n).To(Equal(100))
  107. close(done)
  108. }()
  109. waitForWrite()
  110. frame, hasMoreData := str.popStreamFrame(50)
  111. Expect(frame).ToNot(BeNil())
  112. Expect(hasMoreData).To(BeTrue())
  113. frame, hasMoreData = str.popStreamFrame(1000)
  114. Expect(frame).ToNot(BeNil())
  115. Expect(hasMoreData).To(BeFalse())
  116. frame, _ = str.popStreamFrame(1000)
  117. Expect(frame).To(BeNil())
  118. Eventually(done).Should(BeClosed())
  119. })
  120. It("copies the slice while writing", func() {
  121. mockSender.EXPECT().onHasStreamData(streamID)
  122. frameHeaderSize := protocol.ByteCount(4)
  123. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
  124. mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
  125. mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
  126. s := []byte("foo")
  127. done := make(chan struct{})
  128. go func() {
  129. defer GinkgoRecover()
  130. n, err := strWithTimeout.Write(s)
  131. Expect(err).ToNot(HaveOccurred())
  132. Expect(n).To(Equal(3))
  133. close(done)
  134. }()
  135. waitForWrite()
  136. frame, _ := str.popStreamFrame(frameHeaderSize + 1)
  137. Expect(frame.Data).To(Equal([]byte("f")))
  138. s[1] = 'e'
  139. f, _ := str.popStreamFrame(100)
  140. Expect(f).ToNot(BeNil())
  141. Expect(f.Data).To(Equal([]byte("oo")))
  142. Eventually(done).Should(BeClosed())
  143. })
  144. It("returns when given a nil input", func() {
  145. n, err := strWithTimeout.Write(nil)
  146. Expect(n).To(BeZero())
  147. Expect(err).ToNot(HaveOccurred())
  148. })
  149. It("returns when given an empty slice", func() {
  150. n, err := strWithTimeout.Write([]byte(""))
  151. Expect(n).To(BeZero())
  152. Expect(err).ToNot(HaveOccurred())
  153. })
  154. It("cancels the context when Close is called", func() {
  155. mockSender.EXPECT().onHasStreamData(streamID)
  156. Expect(str.Context().Done()).ToNot(BeClosed())
  157. str.Close()
  158. Expect(str.Context().Done()).To(BeClosed())
  159. })
  160. Context("flow control blocking", func() {
  161. It("queues a BLOCKED frame if the stream is flow control blocked", func() {
  162. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
  163. mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12))
  164. mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
  165. StreamID: streamID,
  166. Offset: 12,
  167. })
  168. mockSender.EXPECT().onHasStreamData(streamID)
  169. done := make(chan struct{})
  170. go func() {
  171. defer GinkgoRecover()
  172. _, err := str.Write([]byte("foobar"))
  173. Expect(err).ToNot(HaveOccurred())
  174. close(done)
  175. }()
  176. waitForWrite()
  177. f, hasMoreData := str.popStreamFrame(1000)
  178. Expect(f).To(BeNil())
  179. Expect(hasMoreData).To(BeFalse())
  180. // make the Write go routine return
  181. str.closeForShutdown(nil)
  182. Eventually(done).Should(BeClosed())
  183. })
  184. It("says that it doesn't have any more data, when it is flow control blocked", func() {
  185. frameHeaderSize := protocol.ByteCount(4)
  186. mockSender.EXPECT().onHasStreamData(streamID)
  187. done := make(chan struct{})
  188. go func() {
  189. defer GinkgoRecover()
  190. _, err := str.Write([]byte("foobar"))
  191. Expect(err).ToNot(HaveOccurred())
  192. close(done)
  193. }()
  194. waitForWrite()
  195. // first pop a STREAM frame of the maximum size allowed by flow control
  196. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3))
  197. mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3))
  198. f, hasMoreData := str.popStreamFrame(frameHeaderSize + 3)
  199. Expect(f).ToNot(BeNil())
  200. Expect(hasMoreData).To(BeTrue())
  201. // try to pop again, this time noticing that we're blocked
  202. mockFC.EXPECT().SendWindowSize()
  203. // don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller
  204. mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
  205. mockSender.EXPECT().queueControlFrame(&wire.StreamBlockedFrame{
  206. StreamID: streamID,
  207. Offset: 10,
  208. })
  209. f, hasMoreData = str.popStreamFrame(1000)
  210. Expect(f).To(BeNil())
  211. Expect(hasMoreData).To(BeFalse())
  212. // make the Write go routine return
  213. str.closeForShutdown(nil)
  214. Eventually(done).Should(BeClosed())
  215. })
  216. })
  217. Context("deadlines", func() {
  218. It("returns an error when Write is called after the deadline", func() {
  219. str.SetWriteDeadline(time.Now().Add(-time.Second))
  220. n, err := strWithTimeout.Write([]byte("foobar"))
  221. Expect(err).To(MatchError(errDeadline))
  222. Expect(n).To(BeZero())
  223. })
  224. It("unblocks after the deadline", func() {
  225. mockSender.EXPECT().onHasStreamData(streamID)
  226. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  227. str.SetWriteDeadline(deadline)
  228. n, err := strWithTimeout.Write([]byte("foobar"))
  229. Expect(err).To(MatchError(errDeadline))
  230. Expect(n).To(BeZero())
  231. Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
  232. })
  233. It("returns the number of bytes written, when the deadline expires", func() {
  234. mockSender.EXPECT().onHasStreamData(streamID)
  235. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
  236. mockFC.EXPECT().AddBytesSent(gomock.Any())
  237. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  238. str.SetWriteDeadline(deadline)
  239. var n int
  240. writeReturned := make(chan struct{})
  241. go func() {
  242. defer GinkgoRecover()
  243. var err error
  244. n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
  245. Expect(err).To(MatchError(errDeadline))
  246. Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
  247. close(writeReturned)
  248. }()
  249. waitForWrite()
  250. frame, hasMoreData := str.popStreamFrame(50)
  251. Expect(frame).ToNot(BeNil())
  252. Expect(hasMoreData).To(BeTrue())
  253. Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
  254. Expect(n).To(BeEquivalentTo(frame.DataLen()))
  255. })
  256. It("doesn't pop any data after the deadline expired", func() {
  257. mockSender.EXPECT().onHasStreamData(streamID)
  258. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
  259. mockFC.EXPECT().AddBytesSent(gomock.Any())
  260. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  261. str.SetWriteDeadline(deadline)
  262. writeReturned := make(chan struct{})
  263. go func() {
  264. defer GinkgoRecover()
  265. _, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
  266. Expect(err).To(MatchError(errDeadline))
  267. close(writeReturned)
  268. }()
  269. waitForWrite()
  270. frame, hasMoreData := str.popStreamFrame(50)
  271. Expect(frame).ToNot(BeNil())
  272. Expect(hasMoreData).To(BeTrue())
  273. Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
  274. frame, hasMoreData = str.popStreamFrame(50)
  275. Expect(frame).To(BeNil())
  276. Expect(hasMoreData).To(BeFalse())
  277. })
  278. It("doesn't unblock if the deadline is changed before the first one expires", func() {
  279. mockSender.EXPECT().onHasStreamData(streamID)
  280. deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  281. deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
  282. str.SetWriteDeadline(deadline1)
  283. done := make(chan struct{})
  284. go func() {
  285. defer GinkgoRecover()
  286. time.Sleep(scaleDuration(20 * time.Millisecond))
  287. str.SetWriteDeadline(deadline2)
  288. // make sure that this was actually execute before the deadline expires
  289. Expect(time.Now()).To(BeTemporally("<", deadline1))
  290. close(done)
  291. }()
  292. runtime.Gosched()
  293. n, err := strWithTimeout.Write([]byte("foobar"))
  294. Expect(err).To(MatchError(errDeadline))
  295. Expect(n).To(BeZero())
  296. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
  297. Eventually(done).Should(BeClosed())
  298. })
  299. It("unblocks earlier, when a new deadline is set", func() {
  300. mockSender.EXPECT().onHasStreamData(streamID)
  301. deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
  302. deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  303. done := make(chan struct{})
  304. go func() {
  305. defer GinkgoRecover()
  306. time.Sleep(scaleDuration(10 * time.Millisecond))
  307. str.SetWriteDeadline(deadline2)
  308. // make sure that this was actually execute before the deadline expires
  309. Expect(time.Now()).To(BeTemporally("<", deadline2))
  310. close(done)
  311. }()
  312. str.SetWriteDeadline(deadline1)
  313. runtime.Gosched()
  314. _, err := strWithTimeout.Write([]byte("foobar"))
  315. Expect(err).To(MatchError(errDeadline))
  316. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
  317. Eventually(done).Should(BeClosed())
  318. })
  319. })
  320. Context("closing", func() {
  321. It("doesn't allow writes after it has been closed", func() {
  322. mockSender.EXPECT().onHasStreamData(streamID)
  323. str.Close()
  324. _, err := strWithTimeout.Write([]byte("foobar"))
  325. Expect(err).To(MatchError("write on closed stream 1337"))
  326. })
  327. It("allows FIN", func() {
  328. mockSender.EXPECT().onHasStreamData(streamID)
  329. mockSender.EXPECT().onStreamCompleted(streamID)
  330. str.Close()
  331. f, hasMoreData := str.popStreamFrame(1000)
  332. Expect(f).ToNot(BeNil())
  333. Expect(f.Data).To(BeEmpty())
  334. Expect(f.FinBit).To(BeTrue())
  335. Expect(hasMoreData).To(BeFalse())
  336. })
  337. It("doesn't send a FIN when there's still data", func() {
  338. mockSender.EXPECT().onHasStreamData(streamID)
  339. frameHeaderLen := protocol.ByteCount(4)
  340. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
  341. mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
  342. str.dataForWriting = []byte("foobar")
  343. Expect(str.Close()).To(Succeed())
  344. f, _ := str.popStreamFrame(3 + frameHeaderLen)
  345. Expect(f).ToNot(BeNil())
  346. Expect(f.Data).To(Equal([]byte("foo")))
  347. Expect(f.FinBit).To(BeFalse())
  348. mockSender.EXPECT().onStreamCompleted(streamID)
  349. f, _ = str.popStreamFrame(100)
  350. Expect(f.Data).To(Equal([]byte("bar")))
  351. Expect(f.FinBit).To(BeTrue())
  352. })
  353. It("doesn't allow FIN after it is closed for shutdown", func() {
  354. str.closeForShutdown(errors.New("test"))
  355. f, hasMoreData := str.popStreamFrame(1000)
  356. Expect(f).To(BeNil())
  357. Expect(hasMoreData).To(BeFalse())
  358. })
  359. It("doesn't allow FIN twice", func() {
  360. mockSender.EXPECT().onHasStreamData(streamID)
  361. mockSender.EXPECT().onStreamCompleted(streamID)
  362. str.Close()
  363. f, _ := str.popStreamFrame(1000)
  364. Expect(f).ToNot(BeNil())
  365. Expect(f.Data).To(BeEmpty())
  366. Expect(f.FinBit).To(BeTrue())
  367. f, hasMoreData := str.popStreamFrame(1000)
  368. Expect(f).To(BeNil())
  369. Expect(hasMoreData).To(BeFalse())
  370. })
  371. })
  372. Context("closing for shutdown", func() {
  373. testErr := errors.New("test")
  374. It("returns errors when the stream is cancelled", func() {
  375. str.closeForShutdown(testErr)
  376. n, err := strWithTimeout.Write([]byte("foo"))
  377. Expect(n).To(BeZero())
  378. Expect(err).To(MatchError(testErr))
  379. })
  380. It("doesn't get data for writing if an error occurred", func() {
  381. mockSender.EXPECT().onHasStreamData(streamID)
  382. mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
  383. mockFC.EXPECT().AddBytesSent(gomock.Any())
  384. done := make(chan struct{})
  385. go func() {
  386. defer GinkgoRecover()
  387. _, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 500))
  388. Expect(err).To(MatchError(testErr))
  389. close(done)
  390. }()
  391. waitForWrite()
  392. frame, hasMoreData := str.popStreamFrame(50) // get a STREAM frame containing some data, but not all
  393. Expect(frame).ToNot(BeNil())
  394. Expect(hasMoreData).To(BeTrue())
  395. str.closeForShutdown(testErr)
  396. frame, hasMoreData = str.popStreamFrame(1000)
  397. Expect(frame).To(BeNil())
  398. Expect(hasMoreData).To(BeFalse())
  399. Eventually(done).Should(BeClosed())
  400. })
  401. It("cancels the context", func() {
  402. Expect(str.Context().Done()).ToNot(BeClosed())
  403. str.closeForShutdown(testErr)
  404. Expect(str.Context().Done()).To(BeClosed())
  405. })
  406. })
  407. })
  408. Context("handling MAX_STREAM_DATA frames", func() {
  409. It("informs the flow controller", func() {
  410. mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x1337))
  411. str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
  412. StreamID: streamID,
  413. ByteOffset: 0x1337,
  414. })
  415. })
  416. It("says when it has data for sending", func() {
  417. mockFC.EXPECT().UpdateSendWindow(gomock.Any())
  418. mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for Write, once for the MAX_STREAM_DATA frame
  419. done := make(chan struct{})
  420. go func() {
  421. defer GinkgoRecover()
  422. _, err := str.Write([]byte("foobar"))
  423. Expect(err).ToNot(HaveOccurred())
  424. close(done)
  425. }()
  426. waitForWrite()
  427. str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
  428. StreamID: streamID,
  429. ByteOffset: 42,
  430. })
  431. // make sure the Write go routine returns
  432. str.closeForShutdown(nil)
  433. Eventually(done).Should(BeClosed())
  434. })
  435. })
  436. Context("stream cancelations", func() {
  437. Context("canceling writing", func() {
  438. It("queues a RST_STREAM frame", func() {
  439. mockSender.EXPECT().queueControlFrame(&wire.RstStreamFrame{
  440. StreamID: streamID,
  441. ByteOffset: 1234,
  442. ErrorCode: 9876,
  443. })
  444. mockSender.EXPECT().onStreamCompleted(streamID)
  445. str.writeOffset = 1234
  446. err := str.CancelWrite(9876)
  447. Expect(err).ToNot(HaveOccurred())
  448. })
  449. It("unblocks Write", func() {
  450. mockSender.EXPECT().onHasStreamData(streamID)
  451. mockSender.EXPECT().onStreamCompleted(streamID)
  452. mockSender.EXPECT().queueControlFrame(gomock.Any())
  453. mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
  454. mockFC.EXPECT().AddBytesSent(gomock.Any())
  455. writeReturned := make(chan struct{})
  456. var n int
  457. go func() {
  458. defer GinkgoRecover()
  459. var err error
  460. n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
  461. Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
  462. close(writeReturned)
  463. }()
  464. waitForWrite()
  465. frame, _ := str.popStreamFrame(50)
  466. Expect(frame).ToNot(BeNil())
  467. err := str.CancelWrite(1234)
  468. Expect(err).ToNot(HaveOccurred())
  469. Eventually(writeReturned).Should(BeClosed())
  470. Expect(n).To(BeEquivalentTo(frame.DataLen()))
  471. })
  472. It("cancels the context", func() {
  473. mockSender.EXPECT().queueControlFrame(gomock.Any())
  474. mockSender.EXPECT().onStreamCompleted(streamID)
  475. Expect(str.Context().Done()).ToNot(BeClosed())
  476. str.CancelWrite(1234)
  477. Expect(str.Context().Done()).To(BeClosed())
  478. })
  479. It("doesn't allow further calls to Write", func() {
  480. mockSender.EXPECT().queueControlFrame(gomock.Any())
  481. mockSender.EXPECT().onStreamCompleted(streamID)
  482. err := str.CancelWrite(1234)
  483. Expect(err).ToNot(HaveOccurred())
  484. _, err = strWithTimeout.Write([]byte("foobar"))
  485. Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
  486. })
  487. It("only cancels once", func() {
  488. mockSender.EXPECT().queueControlFrame(gomock.Any())
  489. mockSender.EXPECT().onStreamCompleted(streamID)
  490. err := str.CancelWrite(1234)
  491. Expect(err).ToNot(HaveOccurred())
  492. err = str.CancelWrite(4321)
  493. Expect(err).ToNot(HaveOccurred())
  494. })
  495. It("doesn't cancel when the stream was already closed", func() {
  496. mockSender.EXPECT().onHasStreamData(streamID)
  497. err := str.Close()
  498. Expect(err).ToNot(HaveOccurred())
  499. err = str.CancelWrite(123)
  500. Expect(err).To(MatchError("CancelWrite for closed stream 1337"))
  501. })
  502. })
  503. Context("receiving STOP_SENDING frames", func() {
  504. It("queues a RST_STREAM frames with error code Stopping", func() {
  505. mockSender.EXPECT().queueControlFrame(&wire.RstStreamFrame{
  506. StreamID: streamID,
  507. ErrorCode: errorCodeStopping,
  508. })
  509. mockSender.EXPECT().onStreamCompleted(streamID)
  510. str.handleStopSendingFrame(&wire.StopSendingFrame{
  511. StreamID: streamID,
  512. ErrorCode: 101,
  513. })
  514. })
  515. It("unblocks Write", func() {
  516. mockSender.EXPECT().onHasStreamData(streamID)
  517. mockSender.EXPECT().queueControlFrame(gomock.Any())
  518. done := make(chan struct{})
  519. go func() {
  520. defer GinkgoRecover()
  521. _, err := str.Write([]byte("foobar"))
  522. Expect(err).To(MatchError("Stream 1337 was reset with error code 123"))
  523. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  524. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  525. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123)))
  526. close(done)
  527. }()
  528. waitForWrite()
  529. mockSender.EXPECT().onStreamCompleted(streamID)
  530. str.handleStopSendingFrame(&wire.StopSendingFrame{
  531. StreamID: streamID,
  532. ErrorCode: 123,
  533. })
  534. Eventually(done).Should(BeClosed())
  535. })
  536. It("doesn't allow further calls to Write", func() {
  537. mockSender.EXPECT().queueControlFrame(gomock.Any())
  538. mockSender.EXPECT().onStreamCompleted(streamID)
  539. str.handleStopSendingFrame(&wire.StopSendingFrame{
  540. StreamID: streamID,
  541. ErrorCode: 123,
  542. })
  543. _, err := str.Write([]byte("foobar"))
  544. Expect(err).To(MatchError("Stream 1337 was reset with error code 123"))
  545. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  546. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  547. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123)))
  548. })
  549. })
  550. })
  551. })