receiving.go 4.9 KB


  1. //go:build !confonly
  2. // +build !confonly
  3. package kcp
  4. import (
  5. "sync"
  6. "github.com/v2fly/v2ray-core/v4/common/buf"
  7. )
  8. type ReceivingWindow struct {
  9. cache map[uint32]*DataSegment
  10. }
  11. func NewReceivingWindow() *ReceivingWindow {
  12. return &ReceivingWindow{
  13. cache: make(map[uint32]*DataSegment),
  14. }
  15. }
  16. func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
  17. _, f := w.cache[id]
  18. if f {
  19. return false
  20. }
  21. w.cache[id] = value
  22. return true
  23. }
  24. func (w *ReceivingWindow) Has(id uint32) bool {
  25. _, f := w.cache[id]
  26. return f
  27. }
  28. func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
  29. v, f := w.cache[id]
  30. if !f {
  31. return nil
  32. }
  33. delete(w.cache, id)
  34. return v
  35. }
  36. type AckList struct {
  37. writer SegmentWriter
  38. timestamps []uint32
  39. numbers []uint32
  40. nextFlush []uint32
  41. flushCandidates []uint32
  42. dirty bool
  43. }
  44. func NewAckList(writer SegmentWriter) *AckList {
  45. return &AckList{
  46. writer: writer,
  47. timestamps: make([]uint32, 0, 128),
  48. numbers: make([]uint32, 0, 128),
  49. nextFlush: make([]uint32, 0, 128),
  50. flushCandidates: make([]uint32, 0, 128),
  51. }
  52. }
  53. func (l *AckList) Add(number uint32, timestamp uint32) {
  54. l.timestamps = append(l.timestamps, timestamp)
  55. l.numbers = append(l.numbers, number)
  56. l.nextFlush = append(l.nextFlush, 0)
  57. l.dirty = true
  58. }
  59. func (l *AckList) Clear(una uint32) {
  60. count := 0
  61. for i := 0; i < len(l.numbers); i++ {
  62. if l.numbers[i] < una {
  63. continue
  64. }
  65. if i != count {
  66. l.numbers[count] = l.numbers[i]
  67. l.timestamps[count] = l.timestamps[i]
  68. l.nextFlush[count] = l.nextFlush[i]
  69. }
  70. count++
  71. }
  72. if count < len(l.numbers) {
  73. l.numbers = l.numbers[:count]
  74. l.timestamps = l.timestamps[:count]
  75. l.nextFlush = l.nextFlush[:count]
  76. l.dirty = true
  77. }
  78. }
  79. func (l *AckList) Flush(current uint32, rto uint32) {
  80. l.flushCandidates = l.flushCandidates[:0]
  81. seg := NewAckSegment()
  82. for i := 0; i < len(l.numbers); i++ {
  83. if l.nextFlush[i] > current {
  84. if len(l.flushCandidates) < cap(l.flushCandidates) {
  85. l.flushCandidates = append(l.flushCandidates, l.numbers[i])
  86. }
  87. continue
  88. }
  89. seg.PutNumber(l.numbers[i])
  90. seg.PutTimestamp(l.timestamps[i])
  91. timeout := rto / 2
  92. if timeout < 20 {
  93. timeout = 20
  94. }
  95. l.nextFlush[i] = current + timeout
  96. if seg.IsFull() {
  97. l.writer.Write(seg)
  98. seg.Release()
  99. seg = NewAckSegment()
  100. l.dirty = false
  101. }
  102. }
  103. if l.dirty || !seg.IsEmpty() {
  104. for _, number := range l.flushCandidates {
  105. if seg.IsFull() {
  106. break
  107. }
  108. seg.PutNumber(number)
  109. }
  110. l.writer.Write(seg)
  111. l.dirty = false
  112. }
  113. seg.Release()
  114. }
  115. type ReceivingWorker struct {
  116. sync.RWMutex
  117. conn *Connection
  118. leftOver buf.MultiBuffer
  119. window *ReceivingWindow
  120. acklist *AckList
  121. nextNumber uint32
  122. windowSize uint32
  123. }
  124. func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
  125. worker := &ReceivingWorker{
  126. conn: kcp,
  127. window: NewReceivingWindow(),
  128. windowSize: kcp.Config.GetReceivingInFlightSize(),
  129. }
  130. worker.acklist = NewAckList(worker)
  131. return worker
  132. }
  133. func (w *ReceivingWorker) Release() {
  134. w.Lock()
  135. buf.ReleaseMulti(w.leftOver)
  136. w.leftOver = nil
  137. w.Unlock()
  138. }
  139. func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
  140. w.Lock()
  141. defer w.Unlock()
  142. w.acklist.Clear(number)
  143. }
  144. func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
  145. w.Lock()
  146. defer w.Unlock()
  147. number := seg.Number
  148. idx := number - w.nextNumber
  149. if idx >= w.windowSize {
  150. return
  151. }
  152. w.acklist.Clear(seg.SendingNext)
  153. w.acklist.Add(number, seg.Timestamp)
  154. if !w.window.Set(seg.Number, seg) {
  155. seg.Release()
  156. }
  157. }
  158. func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
  159. if w.leftOver != nil {
  160. mb := w.leftOver
  161. w.leftOver = nil
  162. return mb
  163. }
  164. mb := make(buf.MultiBuffer, 0, 32)
  165. w.Lock()
  166. defer w.Unlock()
  167. for {
  168. seg := w.window.Remove(w.nextNumber)
  169. if seg == nil {
  170. break
  171. }
  172. w.nextNumber++
  173. mb = append(mb, seg.Detach())
  174. seg.Release()
  175. }
  176. return mb
  177. }
  178. func (w *ReceivingWorker) Read(b []byte) int {
  179. mb := w.ReadMultiBuffer()
  180. if mb.IsEmpty() {
  181. return 0
  182. }
  183. mb, nBytes := buf.SplitBytes(mb, b)
  184. if !mb.IsEmpty() {
  185. w.leftOver = mb
  186. }
  187. return nBytes
  188. }
  189. func (w *ReceivingWorker) IsDataAvailable() bool {
  190. w.RLock()
  191. defer w.RUnlock()
  192. return w.window.Has(w.nextNumber)
  193. }
  194. func (w *ReceivingWorker) NextNumber() uint32 {
  195. w.RLock()
  196. defer w.RUnlock()
  197. return w.nextNumber
  198. }
  199. func (w *ReceivingWorker) Flush(current uint32) {
  200. w.Lock()
  201. defer w.Unlock()
  202. w.acklist.Flush(current, w.conn.roundTrip.Timeout())
  203. }
  204. func (w *ReceivingWorker) Write(seg Segment) error {
  205. ackSeg := seg.(*AckSegment)
  206. ackSeg.Conv = w.conn.meta.Conversation
  207. ackSeg.ReceivingNext = w.nextNumber
  208. ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
  209. ackSeg.Option = 0
  210. if w.conn.State() == StateReadyToClose {
  211. ackSeg.Option = SegmentOptionClose
  212. }
  213. return w.conn.output.Write(ackSeg)
  214. }
  215. func (*ReceivingWorker) CloseRead() {
  216. }
  217. func (w *ReceivingWorker) UpdateNecessary() bool {
  218. w.RLock()
  219. defer w.RUnlock()
  220. return len(w.acklist.numbers) > 0
  221. }