receiving.go 4.8 KB


  1. package kcp
  2. import (
  3. "sync"
  4. "v2ray.com/core/common"
  5. "v2ray.com/core/common/buf"
  6. )
  7. type ReceivingWindow struct {
  8. cache map[uint32]*DataSegment
  9. }
  10. func NewReceivingWindow() *ReceivingWindow {
  11. return &ReceivingWindow{
  12. cache: make(map[uint32]*DataSegment),
  13. }
  14. }
  15. func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
  16. _, f := w.cache[id]
  17. if f {
  18. return false
  19. }
  20. w.cache[id] = value
  21. return true
  22. }
  23. func (w *ReceivingWindow) Has(id uint32) bool {
  24. _, f := w.cache[id]
  25. return f
  26. }
  27. func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
  28. v, f := w.cache[id]
  29. if !f {
  30. return nil
  31. }
  32. delete(w.cache, id)
  33. return v
  34. }
  35. type AckList struct {
  36. writer SegmentWriter
  37. timestamps []uint32
  38. numbers []uint32
  39. nextFlush []uint32
  40. flushCandidates []uint32
  41. dirty bool
  42. }
  43. func NewAckList(writer SegmentWriter) *AckList {
  44. return &AckList{
  45. writer: writer,
  46. timestamps: make([]uint32, 0, 128),
  47. numbers: make([]uint32, 0, 128),
  48. nextFlush: make([]uint32, 0, 128),
  49. flushCandidates: make([]uint32, 0, 128),
  50. }
  51. }
  52. func (l *AckList) Add(number uint32, timestamp uint32) {
  53. l.timestamps = append(l.timestamps, timestamp)
  54. l.numbers = append(l.numbers, number)
  55. l.nextFlush = append(l.nextFlush, 0)
  56. l.dirty = true
  57. }
  58. func (l *AckList) Clear(una uint32) {
  59. count := 0
  60. for i := 0; i < len(l.numbers); i++ {
  61. if l.numbers[i] < una {
  62. continue
  63. }
  64. if i != count {
  65. l.numbers[count] = l.numbers[i]
  66. l.timestamps[count] = l.timestamps[i]
  67. l.nextFlush[count] = l.nextFlush[i]
  68. }
  69. count++
  70. }
  71. if count < len(l.numbers) {
  72. l.numbers = l.numbers[:count]
  73. l.timestamps = l.timestamps[:count]
  74. l.nextFlush = l.nextFlush[:count]
  75. l.dirty = true
  76. }
  77. }
  78. func (l *AckList) Flush(current uint32, rto uint32) {
  79. l.flushCandidates = l.flushCandidates[:0]
  80. seg := NewAckSegment()
  81. for i := 0; i < len(l.numbers); i++ {
  82. if l.nextFlush[i] > current {
  83. if len(l.flushCandidates) < cap(l.flushCandidates) {
  84. l.flushCandidates = append(l.flushCandidates, l.numbers[i])
  85. }
  86. continue
  87. }
  88. seg.PutNumber(l.numbers[i])
  89. seg.PutTimestamp(l.timestamps[i])
  90. timeout := rto / 2
  91. if timeout < 20 {
  92. timeout = 20
  93. }
  94. l.nextFlush[i] = current + timeout
  95. if seg.IsFull() {
  96. l.writer.Write(seg)
  97. seg.Release()
  98. seg = NewAckSegment()
  99. l.dirty = false
  100. }
  101. }
  102. if l.dirty || !seg.IsEmpty() {
  103. for _, number := range l.flushCandidates {
  104. if seg.IsFull() {
  105. break
  106. }
  107. seg.PutNumber(number)
  108. }
  109. l.writer.Write(seg)
  110. l.dirty = false
  111. }
  112. seg.Release()
  113. }
  114. type ReceivingWorker struct {
  115. sync.RWMutex
  116. conn *Connection
  117. leftOver buf.MultiBuffer
  118. window *ReceivingWindow
  119. acklist *AckList
  120. nextNumber uint32
  121. windowSize uint32
  122. }
  123. func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
  124. worker := &ReceivingWorker{
  125. conn: kcp,
  126. window: NewReceivingWindow(),
  127. windowSize: kcp.Config.GetReceivingInFlightSize(),
  128. }
  129. worker.acklist = NewAckList(worker)
  130. return worker
  131. }
  132. func (w *ReceivingWorker) Release() {
  133. w.Lock()
  134. w.leftOver.Release()
  135. w.Unlock()
  136. }
  137. func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
  138. w.Lock()
  139. defer w.Unlock()
  140. w.acklist.Clear(number)
  141. }
  142. func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
  143. w.Lock()
  144. defer w.Unlock()
  145. number := seg.Number
  146. idx := number - w.nextNumber
  147. if idx >= w.windowSize {
  148. return
  149. }
  150. w.acklist.Clear(seg.SendingNext)
  151. w.acklist.Add(number, seg.Timestamp)
  152. if !w.window.Set(seg.Number, seg) {
  153. seg.Release()
  154. }
  155. }
  156. func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
  157. if w.leftOver != nil {
  158. mb := w.leftOver
  159. w.leftOver = nil
  160. return mb
  161. }
  162. mb := buf.NewMultiBufferCap(32)
  163. w.Lock()
  164. defer w.Unlock()
  165. for {
  166. seg := w.window.Remove(w.nextNumber)
  167. if seg == nil {
  168. break
  169. }
  170. w.nextNumber++
  171. mb.Append(seg.Detach())
  172. seg.Release()
  173. }
  174. return mb
  175. }
  176. func (w *ReceivingWorker) Read(b []byte) int {
  177. mb := w.ReadMultiBuffer()
  178. nBytes, err := mb.Read(b)
  179. common.Must(err)
  180. if !mb.IsEmpty() {
  181. w.leftOver = mb
  182. }
  183. return nBytes
  184. }
  185. func (w *ReceivingWorker) IsDataAvailable() bool {
  186. w.RLock()
  187. defer w.RUnlock()
  188. return w.window.Has(w.nextNumber)
  189. }
  190. func (w *ReceivingWorker) NextNumber() uint32 {
  191. w.RLock()
  192. defer w.RUnlock()
  193. return w.nextNumber
  194. }
  195. func (w *ReceivingWorker) Flush(current uint32) {
  196. w.Lock()
  197. defer w.Unlock()
  198. w.acklist.Flush(current, w.conn.roundTrip.Timeout())
  199. }
  200. func (w *ReceivingWorker) Write(seg Segment) error {
  201. ackSeg := seg.(*AckSegment)
  202. ackSeg.Conv = w.conn.meta.Conversation
  203. ackSeg.ReceivingNext = w.nextNumber
  204. ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
  205. ackSeg.Option = 0
  206. if w.conn.State() == StateReadyToClose {
  207. ackSeg.Option = SegmentOptionClose
  208. }
  209. return w.conn.output.Write(ackSeg)
  210. }
  211. func (*ReceivingWorker) CloseRead() {
  212. }
  213. func (w *ReceivingWorker) UpdateNecessary() bool {
  214. w.RLock()
  215. defer w.RUnlock()
  216. return len(w.acklist.numbers) > 0
  217. }