streams_map_legacy.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "github.com/lucas-clemente/quic-go/internal/handshake"
  7. "github.com/lucas-clemente/quic-go/internal/protocol"
  8. "github.com/lucas-clemente/quic-go/internal/utils"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. "github.com/lucas-clemente/quic-go/qerr"
  11. )
  12. type streamsMapLegacy struct {
  13. mutex sync.RWMutex
  14. perspective protocol.Perspective
  15. streams map[protocol.StreamID]streamI
  16. nextStreamToOpen protocol.StreamID // StreamID of the next Stream that will be returned by OpenStream()
  17. highestStreamOpenedByPeer protocol.StreamID
  18. nextStreamOrErrCond sync.Cond
  19. openStreamOrErrCond sync.Cond
  20. closeErr error
  21. nextStreamToAccept protocol.StreamID
  22. newStream func(protocol.StreamID) streamI
  23. numOutgoingStreams uint32
  24. numIncomingStreams uint32
  25. maxIncomingStreams uint32
  26. maxOutgoingStreams uint32
  27. }
  28. var _ streamManager = &streamsMapLegacy{}
  29. var errMapAccess = errors.New("streamsMap: Error accessing the streams map")
  30. func newStreamsMapLegacy(newStream func(protocol.StreamID) streamI, maxStreams int, pers protocol.Perspective) streamManager {
  31. // add some tolerance to the maximum incoming streams value
  32. maxIncomingStreams := utils.MaxUint32(
  33. uint32(maxStreams)+protocol.MaxStreamsMinimumIncrement,
  34. uint32(float64(maxStreams)*float64(protocol.MaxStreamsMultiplier)),
  35. )
  36. sm := streamsMapLegacy{
  37. perspective: pers,
  38. streams: make(map[protocol.StreamID]streamI),
  39. newStream: newStream,
  40. maxIncomingStreams: maxIncomingStreams,
  41. }
  42. sm.nextStreamOrErrCond.L = &sm.mutex
  43. sm.openStreamOrErrCond.L = &sm.mutex
  44. nextServerInitiatedStream := protocol.StreamID(2)
  45. nextClientInitiatedStream := protocol.StreamID(3)
  46. if pers == protocol.PerspectiveServer {
  47. sm.highestStreamOpenedByPeer = 1
  48. }
  49. if pers == protocol.PerspectiveServer {
  50. sm.nextStreamToOpen = nextServerInitiatedStream
  51. sm.nextStreamToAccept = nextClientInitiatedStream
  52. } else {
  53. sm.nextStreamToOpen = nextClientInitiatedStream
  54. sm.nextStreamToAccept = nextServerInitiatedStream
  55. }
  56. return &sm
  57. }
  58. // getStreamPerspective says which side should initiate a stream
  59. func (m *streamsMapLegacy) streamInitiatedBy(id protocol.StreamID) protocol.Perspective {
  60. if id%2 == 0 {
  61. return protocol.PerspectiveServer
  62. }
  63. return protocol.PerspectiveClient
  64. }
  65. func (m *streamsMapLegacy) GetOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  66. // every bidirectional stream is also a receive stream
  67. return m.getOrOpenStream(id)
  68. }
  69. func (m *streamsMapLegacy) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  70. // every bidirectional stream is also a send stream
  71. return m.getOrOpenStream(id)
  72. }
  73. // getOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed.
  74. // Newly opened streams should only originate from the client. To open a stream from the server, OpenStream should be used.
  75. func (m *streamsMapLegacy) getOrOpenStream(id protocol.StreamID) (streamI, error) {
  76. m.mutex.RLock()
  77. s, ok := m.streams[id]
  78. m.mutex.RUnlock()
  79. if ok {
  80. return s, nil
  81. }
  82. // ... we don't have an existing stream
  83. m.mutex.Lock()
  84. defer m.mutex.Unlock()
  85. // We need to check whether another invocation has already created a stream (between RUnlock() and Lock()).
  86. s, ok = m.streams[id]
  87. if ok {
  88. return s, nil
  89. }
  90. if m.perspective == m.streamInitiatedBy(id) {
  91. if id <= m.nextStreamToOpen { // this is a stream opened by us. Must have been closed already
  92. return nil, nil
  93. }
  94. return nil, qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("peer attempted to open stream %d", id))
  95. }
  96. if id <= m.highestStreamOpenedByPeer { // this is a peer-initiated stream that doesn't exist anymore. Must have been closed already
  97. return nil, nil
  98. }
  99. for sid := m.highestStreamOpenedByPeer + 2; sid <= id; sid += 2 {
  100. if _, err := m.openRemoteStream(sid); err != nil {
  101. return nil, err
  102. }
  103. }
  104. m.nextStreamOrErrCond.Broadcast()
  105. return m.streams[id], nil
  106. }
  107. func (m *streamsMapLegacy) openRemoteStream(id protocol.StreamID) (streamI, error) {
  108. if m.numIncomingStreams >= m.maxIncomingStreams {
  109. return nil, qerr.TooManyOpenStreams
  110. }
  111. // maxNewStreamIDDelta is the maximum difference between and a newly opened Stream and the highest StreamID that a client has ever opened
  112. // note that the number of streams is half this value, since the client can only open streams with open StreamID
  113. maxStreamIDDelta := protocol.StreamID(4 * m.maxIncomingStreams)
  114. if id+maxStreamIDDelta < m.highestStreamOpenedByPeer {
  115. return nil, qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("attempted to open stream %d, which is a lot smaller than the highest opened stream, %d", id, m.highestStreamOpenedByPeer))
  116. }
  117. m.numIncomingStreams++
  118. if id > m.highestStreamOpenedByPeer {
  119. m.highestStreamOpenedByPeer = id
  120. }
  121. s := m.newStream(id)
  122. return s, m.putStream(s)
  123. }
  124. func (m *streamsMapLegacy) openStreamImpl() (streamI, error) {
  125. if m.numOutgoingStreams >= m.maxOutgoingStreams {
  126. return nil, qerr.TooManyOpenStreams
  127. }
  128. m.numOutgoingStreams++
  129. s := m.newStream(m.nextStreamToOpen)
  130. m.nextStreamToOpen += 2
  131. return s, m.putStream(s)
  132. }
  133. // OpenStream opens the next available stream
  134. func (m *streamsMapLegacy) OpenStream() (Stream, error) {
  135. m.mutex.Lock()
  136. defer m.mutex.Unlock()
  137. if m.closeErr != nil {
  138. return nil, m.closeErr
  139. }
  140. return m.openStreamImpl()
  141. }
  142. func (m *streamsMapLegacy) OpenStreamSync() (Stream, error) {
  143. m.mutex.Lock()
  144. defer m.mutex.Unlock()
  145. for {
  146. if m.closeErr != nil {
  147. return nil, m.closeErr
  148. }
  149. str, err := m.openStreamImpl()
  150. if err == nil {
  151. return str, err
  152. }
  153. if err != nil && err != qerr.TooManyOpenStreams {
  154. return nil, err
  155. }
  156. m.openStreamOrErrCond.Wait()
  157. }
  158. }
  159. func (m *streamsMapLegacy) OpenUniStream() (SendStream, error) {
  160. return nil, errors.New("gQUIC doesn't support unidirectional streams")
  161. }
  162. func (m *streamsMapLegacy) OpenUniStreamSync() (SendStream, error) {
  163. return nil, errors.New("gQUIC doesn't support unidirectional streams")
  164. }
  165. // AcceptStream returns the next stream opened by the peer
  166. // it blocks until a new stream is opened
  167. func (m *streamsMapLegacy) AcceptStream() (Stream, error) {
  168. m.mutex.Lock()
  169. defer m.mutex.Unlock()
  170. var str streamI
  171. for {
  172. var ok bool
  173. if m.closeErr != nil {
  174. return nil, m.closeErr
  175. }
  176. str, ok = m.streams[m.nextStreamToAccept]
  177. if ok {
  178. break
  179. }
  180. m.nextStreamOrErrCond.Wait()
  181. }
  182. m.nextStreamToAccept += 2
  183. return str, nil
  184. }
  185. func (m *streamsMapLegacy) AcceptUniStream() (ReceiveStream, error) {
  186. return nil, errors.New("gQUIC doesn't support unidirectional streams")
  187. }
  188. func (m *streamsMapLegacy) DeleteStream(id protocol.StreamID) error {
  189. m.mutex.Lock()
  190. defer m.mutex.Unlock()
  191. _, ok := m.streams[id]
  192. if !ok {
  193. return errMapAccess
  194. }
  195. delete(m.streams, id)
  196. if m.streamInitiatedBy(id) == m.perspective {
  197. m.numOutgoingStreams--
  198. } else {
  199. m.numIncomingStreams--
  200. }
  201. m.openStreamOrErrCond.Signal()
  202. return nil
  203. }
  204. func (m *streamsMapLegacy) putStream(s streamI) error {
  205. id := s.StreamID()
  206. if _, ok := m.streams[id]; ok {
  207. return fmt.Errorf("a stream with ID %d already exists", id)
  208. }
  209. m.streams[id] = s
  210. return nil
  211. }
  212. func (m *streamsMapLegacy) CloseWithError(err error) {
  213. m.mutex.Lock()
  214. defer m.mutex.Unlock()
  215. m.closeErr = err
  216. m.nextStreamOrErrCond.Broadcast()
  217. m.openStreamOrErrCond.Broadcast()
  218. for _, s := range m.streams {
  219. s.closeForShutdown(err)
  220. }
  221. }
  222. // TODO(#952): this won't be needed when gQUIC supports stateless handshakes
  223. func (m *streamsMapLegacy) UpdateLimits(params *handshake.TransportParameters) {
  224. m.mutex.Lock()
  225. m.maxOutgoingStreams = params.MaxStreams
  226. for id, str := range m.streams {
  227. str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
  228. StreamID: id,
  229. ByteOffset: params.StreamFlowControlWindow,
  230. })
  231. }
  232. m.mutex.Unlock()
  233. m.openStreamOrErrCond.Broadcast()
  234. }
  235. // should never be called, since MAX_STREAM_ID frames can only be unpacked for IETF QUIC
  236. func (m *streamsMapLegacy) HandleMaxStreamIDFrame(f *wire.MaxStreamIDFrame) error {
  237. return errors.New("gQUIC doesn't have MAX_STREAM_ID frames")
  238. }