receiving.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package kcp
  2. import (
  3. "sync"
  4. "v2ray.com/core/common/buf"
  5. )
  6. type ReceivingWindow struct {
  7. cache map[uint32]*DataSegment
  8. }
  9. func NewReceivingWindow() *ReceivingWindow {
  10. return &ReceivingWindow{
  11. cache: make(map[uint32]*DataSegment),
  12. }
  13. }
  14. func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
  15. _, f := w.cache[id]
  16. if f {
  17. return false
  18. }
  19. w.cache[id] = value
  20. return true
  21. }
  22. func (w *ReceivingWindow) Has(id uint32) bool {
  23. _, f := w.cache[id]
  24. return f
  25. }
  26. func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
  27. v, f := w.cache[id]
  28. if !f {
  29. return nil
  30. }
  31. delete(w.cache, id)
  32. return v
  33. }
  34. type AckList struct {
  35. writer SegmentWriter
  36. timestamps []uint32
  37. numbers []uint32
  38. nextFlush []uint32
  39. flushCandidates []uint32
  40. dirty bool
  41. }
  42. func NewAckList(writer SegmentWriter) *AckList {
  43. return &AckList{
  44. writer: writer,
  45. timestamps: make([]uint32, 0, 128),
  46. numbers: make([]uint32, 0, 128),
  47. nextFlush: make([]uint32, 0, 128),
  48. flushCandidates: make([]uint32, 0, 128),
  49. }
  50. }
  51. func (l *AckList) Add(number uint32, timestamp uint32) {
  52. l.timestamps = append(l.timestamps, timestamp)
  53. l.numbers = append(l.numbers, number)
  54. l.nextFlush = append(l.nextFlush, 0)
  55. l.dirty = true
  56. }
  57. func (l *AckList) Clear(una uint32) {
  58. count := 0
  59. for i := 0; i < len(l.numbers); i++ {
  60. if l.numbers[i] < una {
  61. continue
  62. }
  63. if i != count {
  64. l.numbers[count] = l.numbers[i]
  65. l.timestamps[count] = l.timestamps[i]
  66. l.nextFlush[count] = l.nextFlush[i]
  67. }
  68. count++
  69. }
  70. if count < len(l.numbers) {
  71. l.numbers = l.numbers[:count]
  72. l.timestamps = l.timestamps[:count]
  73. l.nextFlush = l.nextFlush[:count]
  74. l.dirty = true
  75. }
  76. }
  77. func (l *AckList) Flush(current uint32, rto uint32) {
  78. l.flushCandidates = l.flushCandidates[:0]
  79. seg := NewAckSegment()
  80. for i := 0; i < len(l.numbers); i++ {
  81. if l.nextFlush[i] > current {
  82. if len(l.flushCandidates) < cap(l.flushCandidates) {
  83. l.flushCandidates = append(l.flushCandidates, l.numbers[i])
  84. }
  85. continue
  86. }
  87. seg.PutNumber(l.numbers[i])
  88. seg.PutTimestamp(l.timestamps[i])
  89. timeout := rto / 2
  90. if timeout < 20 {
  91. timeout = 20
  92. }
  93. l.nextFlush[i] = current + timeout
  94. if seg.IsFull() {
  95. l.writer.Write(seg)
  96. seg.Release()
  97. seg = NewAckSegment()
  98. l.dirty = false
  99. }
  100. }
  101. if l.dirty || !seg.IsEmpty() {
  102. for _, number := range l.flushCandidates {
  103. if seg.IsFull() {
  104. break
  105. }
  106. seg.PutNumber(number)
  107. }
  108. l.writer.Write(seg)
  109. l.dirty = false
  110. }
  111. seg.Release()
  112. }
  113. type ReceivingWorker struct {
  114. sync.RWMutex
  115. conn *Connection
  116. leftOver buf.MultiBuffer
  117. window *ReceivingWindow
  118. acklist *AckList
  119. nextNumber uint32
  120. windowSize uint32
  121. }
  122. func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
  123. worker := &ReceivingWorker{
  124. conn: kcp,
  125. window: NewReceivingWindow(),
  126. windowSize: kcp.Config.GetReceivingInFlightSize(),
  127. }
  128. worker.acklist = NewAckList(worker)
  129. return worker
  130. }
  131. func (w *ReceivingWorker) Release() {
  132. w.Lock()
  133. w.leftOver.Release()
  134. w.Unlock()
  135. }
  136. func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
  137. w.Lock()
  138. defer w.Unlock()
  139. w.acklist.Clear(number)
  140. }
  141. func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
  142. w.Lock()
  143. defer w.Unlock()
  144. number := seg.Number
  145. idx := number - w.nextNumber
  146. if idx >= w.windowSize {
  147. return
  148. }
  149. w.acklist.Clear(seg.SendingNext)
  150. w.acklist.Add(number, seg.Timestamp)
  151. if !w.window.Set(seg.Number, seg) {
  152. seg.Release()
  153. }
  154. }
  155. func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
  156. if w.leftOver != nil {
  157. mb := w.leftOver
  158. w.leftOver = nil
  159. return mb
  160. }
  161. mb := buf.NewMultiBufferCap(32)
  162. w.Lock()
  163. defer w.Unlock()
  164. for {
  165. seg := w.window.Remove(w.nextNumber)
  166. if seg == nil {
  167. break
  168. }
  169. w.nextNumber++
  170. mb.Append(seg.Detach())
  171. seg.Release()
  172. }
  173. return mb
  174. }
  175. func (w *ReceivingWorker) Read(b []byte) int {
  176. mb := w.ReadMultiBuffer()
  177. nBytes, _ := mb.Read(b)
  178. if !mb.IsEmpty() {
  179. w.leftOver = mb
  180. }
  181. return nBytes
  182. }
  183. func (w *ReceivingWorker) IsDataAvailable() bool {
  184. w.RLock()
  185. defer w.RUnlock()
  186. return w.window.Has(w.nextNumber)
  187. }
  188. func (w *ReceivingWorker) NextNumber() uint32 {
  189. w.RLock()
  190. defer w.RUnlock()
  191. return w.nextNumber
  192. }
  193. func (w *ReceivingWorker) Flush(current uint32) {
  194. w.Lock()
  195. defer w.Unlock()
  196. w.acklist.Flush(current, w.conn.roundTrip.Timeout())
  197. }
  198. func (w *ReceivingWorker) Write(seg Segment) error {
  199. ackSeg := seg.(*AckSegment)
  200. ackSeg.Conv = w.conn.meta.Conversation
  201. ackSeg.ReceivingNext = w.nextNumber
  202. ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
  203. ackSeg.Option = 0
  204. if w.conn.State() == StateReadyToClose {
  205. ackSeg.Option = SegmentOptionClose
  206. }
  207. return w.conn.output.Write(ackSeg)
  208. }
  209. func (*ReceivingWorker) CloseRead() {
  210. }
  211. func (w *ReceivingWorker) UpdateNecessary() bool {
  212. w.RLock()
  213. defer w.RUnlock()
  214. return len(w.acklist.numbers) > 0
  215. }