streams_map.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package quic
  2. import (
  3. "fmt"
  4. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  5. "github.com/lucas-clemente/quic-go/internal/handshake"
  6. "github.com/lucas-clemente/quic-go/internal/protocol"
  7. "github.com/lucas-clemente/quic-go/internal/wire"
  8. )
  9. type streamsMap struct {
  10. perspective protocol.Perspective
  11. sender streamSender
  12. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
  13. outgoingBidiStreams *outgoingBidiStreamsMap
  14. outgoingUniStreams *outgoingUniStreamsMap
  15. incomingBidiStreams *incomingBidiStreamsMap
  16. incomingUniStreams *incomingUniStreamsMap
  17. }
  18. var _ streamManager = &streamsMap{}
  19. func newStreamsMap(
  20. sender streamSender,
  21. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController,
  22. maxIncomingStreams uint64,
  23. maxIncomingUniStreams uint64,
  24. perspective protocol.Perspective,
  25. version protocol.VersionNumber,
  26. ) streamManager {
  27. m := &streamsMap{
  28. perspective: perspective,
  29. newFlowController: newFlowController,
  30. sender: sender,
  31. }
  32. newBidiStream := func(id protocol.StreamID) streamI {
  33. return newStream(id, m.sender, m.newFlowController(id), version)
  34. }
  35. newUniSendStream := func(id protocol.StreamID) sendStreamI {
  36. return newSendStream(id, m.sender, m.newFlowController(id), version)
  37. }
  38. newUniReceiveStream := func(id protocol.StreamID) receiveStreamI {
  39. return newReceiveStream(id, m.sender, m.newFlowController(id), version)
  40. }
  41. m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
  42. protocol.FirstStream(protocol.StreamTypeBidi, perspective),
  43. newBidiStream,
  44. sender.queueControlFrame,
  45. )
  46. m.incomingBidiStreams = newIncomingBidiStreamsMap(
  47. protocol.FirstStream(protocol.StreamTypeBidi, perspective.Opposite()),
  48. protocol.MaxStreamID(protocol.StreamTypeBidi, maxIncomingStreams, perspective.Opposite()),
  49. maxIncomingStreams,
  50. sender.queueControlFrame,
  51. newBidiStream,
  52. )
  53. m.outgoingUniStreams = newOutgoingUniStreamsMap(
  54. protocol.FirstStream(protocol.StreamTypeUni, perspective),
  55. newUniSendStream,
  56. sender.queueControlFrame,
  57. )
  58. m.incomingUniStreams = newIncomingUniStreamsMap(
  59. protocol.FirstStream(protocol.StreamTypeUni, perspective.Opposite()),
  60. protocol.MaxStreamID(protocol.StreamTypeUni, maxIncomingUniStreams, perspective.Opposite()),
  61. maxIncomingUniStreams,
  62. sender.queueControlFrame,
  63. newUniReceiveStream,
  64. )
  65. return m
  66. }
  67. func (m *streamsMap) OpenStream() (Stream, error) {
  68. return m.outgoingBidiStreams.OpenStream()
  69. }
  70. func (m *streamsMap) OpenStreamSync() (Stream, error) {
  71. return m.outgoingBidiStreams.OpenStreamSync()
  72. }
  73. func (m *streamsMap) OpenUniStream() (SendStream, error) {
  74. return m.outgoingUniStreams.OpenStream()
  75. }
  76. func (m *streamsMap) OpenUniStreamSync() (SendStream, error) {
  77. return m.outgoingUniStreams.OpenStreamSync()
  78. }
  79. func (m *streamsMap) AcceptStream() (Stream, error) {
  80. return m.incomingBidiStreams.AcceptStream()
  81. }
  82. func (m *streamsMap) AcceptUniStream() (ReceiveStream, error) {
  83. return m.incomingUniStreams.AcceptStream()
  84. }
  85. func (m *streamsMap) DeleteStream(id protocol.StreamID) error {
  86. switch id.Type() {
  87. case protocol.StreamTypeUni:
  88. if id.InitiatedBy() == m.perspective {
  89. return m.outgoingUniStreams.DeleteStream(id)
  90. }
  91. return m.incomingUniStreams.DeleteStream(id)
  92. case protocol.StreamTypeBidi:
  93. if id.InitiatedBy() == m.perspective {
  94. return m.outgoingBidiStreams.DeleteStream(id)
  95. }
  96. return m.incomingBidiStreams.DeleteStream(id)
  97. }
  98. panic("")
  99. }
  100. func (m *streamsMap) GetOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  101. switch id.Type() {
  102. case protocol.StreamTypeUni:
  103. if id.InitiatedBy() == m.perspective {
  104. // an outgoing unidirectional stream is a send stream, not a receive stream
  105. return nil, fmt.Errorf("peer attempted to open receive stream %d", id)
  106. }
  107. return m.incomingUniStreams.GetOrOpenStream(id)
  108. case protocol.StreamTypeBidi:
  109. if id.InitiatedBy() == m.perspective {
  110. return m.outgoingBidiStreams.GetStream(id)
  111. }
  112. return m.incomingBidiStreams.GetOrOpenStream(id)
  113. }
  114. panic("")
  115. }
  116. func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  117. switch id.Type() {
  118. case protocol.StreamTypeUni:
  119. if id.InitiatedBy() == m.perspective {
  120. return m.outgoingUniStreams.GetStream(id)
  121. }
  122. // an incoming unidirectional stream is a receive stream, not a send stream
  123. return nil, fmt.Errorf("peer attempted to open send stream %d", id)
  124. case protocol.StreamTypeBidi:
  125. if id.InitiatedBy() == m.perspective {
  126. return m.outgoingBidiStreams.GetStream(id)
  127. }
  128. return m.incomingBidiStreams.GetOrOpenStream(id)
  129. }
  130. panic("")
  131. }
  132. func (m *streamsMap) HandleMaxStreamsFrame(f *wire.MaxStreamsFrame) error {
  133. id := protocol.MaxStreamID(f.Type, f.MaxStreams, m.perspective)
  134. switch id.Type() {
  135. case protocol.StreamTypeUni:
  136. m.outgoingUniStreams.SetMaxStream(id)
  137. case protocol.StreamTypeBidi:
  138. fmt.Printf("")
  139. m.outgoingBidiStreams.SetMaxStream(id)
  140. }
  141. return nil
  142. }
  143. func (m *streamsMap) UpdateLimits(p *handshake.TransportParameters) {
  144. // Max{Uni,Bidi}StreamID returns the highest stream ID that the peer is allowed to open.
  145. m.outgoingBidiStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeBidi, p.MaxBidiStreams, m.perspective))
  146. m.outgoingUniStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeUni, p.MaxUniStreams, m.perspective))
  147. }
  148. func (m *streamsMap) CloseWithError(err error) {
  149. m.outgoingBidiStreams.CloseWithError(err)
  150. m.outgoingUniStreams.CloseWithError(err)
  151. m.incomingBidiStreams.CloseWithError(err)
  152. m.incomingUniStreams.CloseWithError(err)
  153. }