receiving.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package kcp
  2. import (
  3. "sync"
  4. "github.com/v2fly/v2ray-core/v4/common/buf"
  5. )
  6. type ReceivingWindow struct {
  7. cache map[uint32]*DataSegment
  8. }
  9. func NewReceivingWindow() *ReceivingWindow {
  10. return &ReceivingWindow{
  11. cache: make(map[uint32]*DataSegment),
  12. }
  13. }
  14. func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
  15. _, f := w.cache[id]
  16. if f {
  17. return false
  18. }
  19. w.cache[id] = value
  20. return true
  21. }
  22. func (w *ReceivingWindow) Has(id uint32) bool {
  23. _, f := w.cache[id]
  24. return f
  25. }
  26. func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
  27. v, f := w.cache[id]
  28. if !f {
  29. return nil
  30. }
  31. delete(w.cache, id)
  32. return v
  33. }
  34. type AckList struct {
  35. writer SegmentWriter
  36. timestamps []uint32
  37. numbers []uint32
  38. nextFlush []uint32
  39. flushCandidates []uint32
  40. dirty bool
  41. }
  42. func NewAckList(writer SegmentWriter) *AckList {
  43. return &AckList{
  44. writer: writer,
  45. timestamps: make([]uint32, 0, 128),
  46. numbers: make([]uint32, 0, 128),
  47. nextFlush: make([]uint32, 0, 128),
  48. flushCandidates: make([]uint32, 0, 128),
  49. }
  50. }
  51. func (l *AckList) Add(number uint32, timestamp uint32) {
  52. l.timestamps = append(l.timestamps, timestamp)
  53. l.numbers = append(l.numbers, number)
  54. l.nextFlush = append(l.nextFlush, 0)
  55. l.dirty = true
  56. }
  57. func (l *AckList) Clear(una uint32) {
  58. count := 0
  59. for i := 0; i < len(l.numbers); i++ {
  60. if l.numbers[i] < una {
  61. continue
  62. }
  63. if i != count {
  64. l.numbers[count] = l.numbers[i]
  65. l.timestamps[count] = l.timestamps[i]
  66. l.nextFlush[count] = l.nextFlush[i]
  67. }
  68. count++
  69. }
  70. if count < len(l.numbers) {
  71. l.numbers = l.numbers[:count]
  72. l.timestamps = l.timestamps[:count]
  73. l.nextFlush = l.nextFlush[:count]
  74. l.dirty = true
  75. }
  76. }
  77. func (l *AckList) Flush(current uint32, rto uint32) {
  78. l.flushCandidates = l.flushCandidates[:0]
  79. seg := NewAckSegment()
  80. for i := 0; i < len(l.numbers); i++ {
  81. if l.nextFlush[i] > current {
  82. if len(l.flushCandidates) < cap(l.flushCandidates) {
  83. l.flushCandidates = append(l.flushCandidates, l.numbers[i])
  84. }
  85. continue
  86. }
  87. seg.PutNumber(l.numbers[i])
  88. seg.PutTimestamp(l.timestamps[i])
  89. timeout := rto / 2
  90. if timeout < 20 {
  91. timeout = 20
  92. }
  93. l.nextFlush[i] = current + timeout
  94. if seg.IsFull() {
  95. l.writer.Write(seg)
  96. seg.Release()
  97. seg = NewAckSegment()
  98. l.dirty = false
  99. }
  100. }
  101. if l.dirty || !seg.IsEmpty() {
  102. for _, number := range l.flushCandidates {
  103. if seg.IsFull() {
  104. break
  105. }
  106. seg.PutNumber(number)
  107. }
  108. l.writer.Write(seg)
  109. l.dirty = false
  110. }
  111. seg.Release()
  112. }
  113. type ReceivingWorker struct {
  114. sync.RWMutex
  115. conn *Connection
  116. leftOver buf.MultiBuffer
  117. window *ReceivingWindow
  118. acklist *AckList
  119. nextNumber uint32
  120. windowSize uint32
  121. }
  122. func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
  123. worker := &ReceivingWorker{
  124. conn: kcp,
  125. window: NewReceivingWindow(),
  126. windowSize: kcp.Config.GetReceivingInFlightSize(),
  127. }
  128. worker.acklist = NewAckList(worker)
  129. return worker
  130. }
  131. func (w *ReceivingWorker) Release() {
  132. w.Lock()
  133. buf.ReleaseMulti(w.leftOver)
  134. w.leftOver = nil
  135. w.Unlock()
  136. }
  137. func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
  138. w.Lock()
  139. defer w.Unlock()
  140. w.acklist.Clear(number)
  141. }
  142. func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
  143. w.Lock()
  144. defer w.Unlock()
  145. number := seg.Number
  146. idx := number - w.nextNumber
  147. if idx >= w.windowSize {
  148. return
  149. }
  150. w.acklist.Clear(seg.SendingNext)
  151. w.acklist.Add(number, seg.Timestamp)
  152. if !w.window.Set(seg.Number, seg) {
  153. seg.Release()
  154. }
  155. }
  156. func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
  157. if w.leftOver != nil {
  158. mb := w.leftOver
  159. w.leftOver = nil
  160. return mb
  161. }
  162. mb := make(buf.MultiBuffer, 0, 32)
  163. w.Lock()
  164. defer w.Unlock()
  165. for {
  166. seg := w.window.Remove(w.nextNumber)
  167. if seg == nil {
  168. break
  169. }
  170. w.nextNumber++
  171. mb = append(mb, seg.Detach())
  172. seg.Release()
  173. }
  174. return mb
  175. }
  176. func (w *ReceivingWorker) Read(b []byte) int {
  177. mb := w.ReadMultiBuffer()
  178. if mb.IsEmpty() {
  179. return 0
  180. }
  181. mb, nBytes := buf.SplitBytes(mb, b)
  182. if !mb.IsEmpty() {
  183. w.leftOver = mb
  184. }
  185. return nBytes
  186. }
  187. func (w *ReceivingWorker) IsDataAvailable() bool {
  188. w.RLock()
  189. defer w.RUnlock()
  190. return w.window.Has(w.nextNumber)
  191. }
  192. func (w *ReceivingWorker) NextNumber() uint32 {
  193. w.RLock()
  194. defer w.RUnlock()
  195. return w.nextNumber
  196. }
  197. func (w *ReceivingWorker) Flush(current uint32) {
  198. w.Lock()
  199. defer w.Unlock()
  200. w.acklist.Flush(current, w.conn.roundTrip.Timeout())
  201. }
  202. func (w *ReceivingWorker) Write(seg Segment) error {
  203. ackSeg := seg.(*AckSegment)
  204. ackSeg.Conv = w.conn.meta.Conversation
  205. ackSeg.ReceivingNext = w.nextNumber
  206. ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
  207. ackSeg.Option = 0
  208. if w.conn.State() == StateReadyToClose {
  209. ackSeg.Option = SegmentOptionClose
  210. }
  211. return w.conn.output.Write(ackSeg)
  212. }
  213. func (*ReceivingWorker) CloseRead() {
  214. }
  215. func (w *ReceivingWorker) UpdateNecessary() bool {
  216. w.RLock()
  217. defer w.RUnlock()
  218. return len(w.acklist.numbers) > 0
  219. }