receiving.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package kcp
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/common/alloc"
  5. )
  6. type ReceivingWindow struct {
  7. start uint32
  8. size uint32
  9. list []*DataSegment
  10. }
  11. func NewReceivingWindow(size uint32) *ReceivingWindow {
  12. return &ReceivingWindow{
  13. start: 0,
  14. size: size,
  15. list: make([]*DataSegment, size),
  16. }
  17. }
  18. func (this *ReceivingWindow) Size() uint32 {
  19. return this.size
  20. }
  21. func (this *ReceivingWindow) Position(idx uint32) uint32 {
  22. return (idx + this.start) % this.size
  23. }
  24. func (this *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
  25. pos := this.Position(idx)
  26. if this.list[pos] != nil {
  27. return false
  28. }
  29. this.list[pos] = value
  30. return true
  31. }
  32. func (this *ReceivingWindow) Remove(idx uint32) *DataSegment {
  33. pos := this.Position(idx)
  34. e := this.list[pos]
  35. this.list[pos] = nil
  36. return e
  37. }
  38. func (this *ReceivingWindow) RemoveFirst() *DataSegment {
  39. return this.Remove(0)
  40. }
  41. func (this *ReceivingWindow) Advance() {
  42. this.start++
  43. if this.start == this.size {
  44. this.start = 0
  45. }
  46. }
  47. type ReceivingQueue struct {
  48. start uint32
  49. cap uint32
  50. len uint32
  51. data []*alloc.Buffer
  52. }
  53. func NewReceivingQueue(size uint32) *ReceivingQueue {
  54. return &ReceivingQueue{
  55. cap: size,
  56. data: make([]*alloc.Buffer, size),
  57. }
  58. }
  59. func (this *ReceivingQueue) IsEmpty() bool {
  60. return this.len == 0
  61. }
  62. func (this *ReceivingQueue) IsFull() bool {
  63. return this.len == this.cap
  64. }
  65. func (this *ReceivingQueue) Read(buf []byte) int {
  66. if this.IsEmpty() {
  67. return 0
  68. }
  69. totalBytes := 0
  70. lenBuf := len(buf)
  71. for !this.IsEmpty() && totalBytes < lenBuf {
  72. payload := this.data[this.start]
  73. nBytes, _ := payload.Read(buf)
  74. buf = buf[nBytes:]
  75. totalBytes += nBytes
  76. if payload.IsEmpty() {
  77. payload.Release()
  78. this.data[this.start] = nil
  79. this.start++
  80. if this.start == this.cap {
  81. this.start = 0
  82. }
  83. this.len--
  84. if this.len == 0 {
  85. this.start = 0
  86. }
  87. }
  88. }
  89. return totalBytes
  90. }
  91. func (this *ReceivingQueue) Put(payload *alloc.Buffer) {
  92. this.data[(this.start+this.len)%this.cap] = payload
  93. this.len++
  94. }
  95. func (this *ReceivingQueue) Close() {
  96. for i := uint32(0); i < this.len; i++ {
  97. this.data[(this.start+i)%this.cap].Release()
  98. this.data[(this.start+i)%this.cap] = nil
  99. }
  100. }
  101. type AckList struct {
  102. writer SegmentWriter
  103. timestamps []uint32
  104. numbers []uint32
  105. nextFlush []uint32
  106. }
  107. func NewAckList(writer SegmentWriter) *AckList {
  108. return &AckList{
  109. writer: writer,
  110. timestamps: make([]uint32, 0, 32),
  111. numbers: make([]uint32, 0, 32),
  112. nextFlush: make([]uint32, 0, 32),
  113. }
  114. }
  115. func (this *AckList) Add(number uint32, timestamp uint32) {
  116. this.timestamps = append(this.timestamps, timestamp)
  117. this.numbers = append(this.numbers, number)
  118. this.nextFlush = append(this.nextFlush, 0)
  119. }
  120. func (this *AckList) Clear(una uint32) {
  121. count := 0
  122. for i := 0; i < len(this.numbers); i++ {
  123. if this.numbers[i] >= una {
  124. if i != count {
  125. this.numbers[count] = this.numbers[i]
  126. this.timestamps[count] = this.timestamps[i]
  127. this.nextFlush[count] = this.nextFlush[i]
  128. }
  129. count++
  130. }
  131. }
  132. if count < len(this.numbers) {
  133. this.numbers = this.numbers[:count]
  134. this.timestamps = this.timestamps[:count]
  135. this.nextFlush = this.nextFlush[:count]
  136. }
  137. }
  138. func (this *AckList) Flush(current uint32, rto uint32) {
  139. seg := NewAckSegment()
  140. for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ {
  141. if this.nextFlush[i] <= current {
  142. seg.PutNumber(this.numbers[i], this.timestamps[i])
  143. this.nextFlush[i] = current + rto/2
  144. }
  145. }
  146. if seg.Count > 0 {
  147. this.writer.Write(seg)
  148. seg.Release()
  149. }
  150. }
  151. type ReceivingWorker struct {
  152. sync.RWMutex
  153. conn *Connection
  154. queue *ReceivingQueue
  155. window *ReceivingWindow
  156. acklist *AckList
  157. updated bool
  158. nextNumber uint32
  159. windowSize uint32
  160. }
  161. func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
  162. windowSize := effectiveConfig.GetReceivingWindowSize()
  163. worker := &ReceivingWorker{
  164. conn: kcp,
  165. queue: NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),
  166. window: NewReceivingWindow(windowSize),
  167. windowSize: windowSize,
  168. }
  169. worker.acklist = NewAckList(worker)
  170. return worker
  171. }
  172. func (this *ReceivingWorker) ProcessSendingNext(number uint32) {
  173. this.Lock()
  174. defer this.Unlock()
  175. this.acklist.Clear(number)
  176. }
  177. func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {
  178. this.Lock()
  179. defer this.Unlock()
  180. number := seg.Number
  181. idx := number - this.nextNumber
  182. if idx >= this.windowSize {
  183. return
  184. }
  185. this.acklist.Clear(seg.SendingNext)
  186. this.acklist.Add(number, seg.Timestamp)
  187. if !this.window.Set(idx, seg) {
  188. seg.Release()
  189. }
  190. for !this.queue.IsFull() {
  191. seg := this.window.RemoveFirst()
  192. if seg == nil {
  193. break
  194. }
  195. this.queue.Put(seg.Data)
  196. seg.Data = nil
  197. seg.Release()
  198. this.window.Advance()
  199. this.nextNumber++
  200. this.updated = true
  201. }
  202. }
  203. func (this *ReceivingWorker) Read(b []byte) int {
  204. this.Lock()
  205. defer this.Unlock()
  206. return this.queue.Read(b)
  207. }
  208. func (this *ReceivingWorker) Flush(current uint32) {
  209. this.Lock()
  210. defer this.Unlock()
  211. this.acklist.Flush(current, this.conn.roundTrip.Timeout())
  212. }
  213. func (this *ReceivingWorker) Write(seg Segment) {
  214. ackSeg := seg.(*AckSegment)
  215. ackSeg.Conv = this.conn.conv
  216. ackSeg.ReceivingNext = this.nextNumber
  217. ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
  218. if this.conn.state == StateReadyToClose {
  219. ackSeg.Opt = SegmentOptionClose
  220. }
  221. this.conn.output.Write(ackSeg)
  222. this.updated = false
  223. }
  224. func (this *ReceivingWorker) CloseRead() {
  225. this.Lock()
  226. defer this.Unlock()
  227. this.queue.Close()
  228. }
  229. func (this *ReceivingWorker) PingNecessary() bool {
  230. this.RLock()
  231. defer this.RUnlock()
  232. return this.updated
  233. }
  234. func (this *ReceivingWorker) MarkPingNecessary(b bool) {
  235. this.Lock()
  236. defer this.Unlock()
  237. this.updated = b
  238. }