kcp.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. v2io "github.com/v2ray/v2ray-core/common/io"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. )
  12. const (
  13. IKCP_RTO_NDL = 30 // no delay min rto
  14. IKCP_RTO_MIN = 100 // normal min rto
  15. IKCP_RTO_DEF = 200
  16. IKCP_RTO_MAX = 60000
  17. IKCP_WND_SND = 32
  18. IKCP_WND_RCV = 32
  19. IKCP_INTERVAL = 100
  20. )
  21. func _itimediff(later, earlier uint32) int32 {
  22. return (int32)(later - earlier)
  23. }
  24. type State int
  25. const (
  26. StateActive State = 0
  27. StateReadyToClose State = 1
  28. StatePeerClosed State = 2
  29. StateTerminating State = 3
  30. StateTerminated State = 4
  31. )
  32. // KCP defines a single KCP connection
  33. type KCP struct {
  34. conv uint16
  35. state State
  36. stateBeginTime uint32
  37. lastIncomingTime uint32
  38. lastPayloadTime uint32
  39. sendingUpdated bool
  40. receivingUpdated bool
  41. lastPingTime uint32
  42. mtu, mss uint32
  43. snd_una, snd_nxt, rcv_nxt uint32
  44. rx_rttvar, rx_srtt, rx_rto uint32
  45. snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
  46. current, interval uint32
  47. snd_queue *SendingQueue
  48. rcv_queue *ReceivingQueue
  49. snd_buf []*DataSegment
  50. rcv_buf *ReceivingWindow
  51. acklist *ACKList
  52. fastresend int32
  53. congestionControl bool
  54. output *SegmentWriter
  55. }
  56. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  57. // from the same connection.
  58. func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
  59. log.Debug("KCP|Core: creating KCP ", conv)
  60. kcp := new(KCP)
  61. kcp.conv = conv
  62. kcp.snd_wnd = sendingWindowSize
  63. kcp.rcv_wnd = receivingWindowSize
  64. kcp.rmt_wnd = IKCP_WND_RCV
  65. kcp.mtu = mtu
  66. kcp.mss = kcp.mtu - DataSegmentOverhead
  67. kcp.rx_rto = IKCP_RTO_DEF
  68. kcp.interval = IKCP_INTERVAL
  69. kcp.output = NewSegmentWriter(mtu, output)
  70. kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
  71. kcp.snd_queue = NewSendingQueue(sendingQueueSize)
  72. kcp.rcv_queue = NewReceivingQueue()
  73. kcp.acklist = NewACKList(kcp)
  74. kcp.cwnd = kcp.snd_wnd
  75. return kcp
  76. }
  77. func (kcp *KCP) SetState(state State) {
  78. kcp.state = state
  79. kcp.stateBeginTime = kcp.current
  80. switch state {
  81. case StateReadyToClose:
  82. kcp.rcv_queue.Close()
  83. case StatePeerClosed:
  84. kcp.ClearSendQueue()
  85. case StateTerminating:
  86. kcp.rcv_queue.Close()
  87. case StateTerminated:
  88. kcp.rcv_queue.Close()
  89. }
  90. }
  91. func (kcp *KCP) HandleOption(opt SegmentOption) {
  92. if (opt & SegmentOptionClose) == SegmentOptionClose {
  93. kcp.OnPeerClosed()
  94. }
  95. }
  96. func (kcp *KCP) OnPeerClosed() {
  97. if kcp.state == StateReadyToClose {
  98. kcp.SetState(StateTerminating)
  99. }
  100. if kcp.state == StateActive {
  101. kcp.SetState(StatePeerClosed)
  102. }
  103. }
  104. func (kcp *KCP) OnClose() {
  105. if kcp.state == StateActive {
  106. kcp.SetState(StateReadyToClose)
  107. }
  108. if kcp.state == StatePeerClosed {
  109. kcp.SetState(StateTerminating)
  110. }
  111. }
  112. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  113. // @Private
  114. func (kcp *KCP) DumpReceivingBuf() {
  115. for {
  116. seg := kcp.rcv_buf.RemoveFirst()
  117. if seg == nil {
  118. break
  119. }
  120. kcp.rcv_queue.Put(seg.Data)
  121. seg.Data = nil
  122. kcp.rcv_buf.Advance()
  123. kcp.rcv_nxt++
  124. kcp.receivingUpdated = true
  125. }
  126. }
  127. // Send is user/upper level send, returns below zero for error
  128. func (kcp *KCP) Send(buffer []byte) int {
  129. nBytes := 0
  130. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  131. var size int
  132. if len(buffer) > int(kcp.mss) {
  133. size = int(kcp.mss)
  134. } else {
  135. size = len(buffer)
  136. }
  137. seg := &DataSegment{
  138. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  139. }
  140. kcp.snd_queue.Push(seg)
  141. buffer = buffer[size:]
  142. nBytes += size
  143. }
  144. return nBytes
  145. }
  146. // https://tools.ietf.org/html/rfc6298
  147. func (kcp *KCP) update_ack(rtt int32) {
  148. if kcp.rx_srtt == 0 {
  149. kcp.rx_srtt = uint32(rtt)
  150. kcp.rx_rttvar = uint32(rtt) / 2
  151. } else {
  152. delta := rtt - int32(kcp.rx_srtt)
  153. if delta < 0 {
  154. delta = -delta
  155. }
  156. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  157. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  158. if kcp.rx_srtt < kcp.interval {
  159. kcp.rx_srtt = kcp.interval
  160. }
  161. }
  162. var rto uint32
  163. if kcp.interval < 4*kcp.rx_rttvar {
  164. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  165. } else {
  166. rto = kcp.rx_srtt + kcp.interval
  167. }
  168. if rto > IKCP_RTO_MAX {
  169. rto = IKCP_RTO_MAX
  170. }
  171. kcp.rx_rto = rto * 3 / 2
  172. }
  173. func (kcp *KCP) shrink_buf() {
  174. prevUna := kcp.snd_una
  175. if len(kcp.snd_buf) > 0 {
  176. seg := kcp.snd_buf[0]
  177. kcp.snd_una = seg.Number
  178. } else {
  179. kcp.snd_una = kcp.snd_nxt
  180. }
  181. if kcp.snd_una != prevUna {
  182. kcp.sendingUpdated = true
  183. }
  184. }
  185. func (kcp *KCP) parse_ack(sn uint32) {
  186. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  187. return
  188. }
  189. for k, seg := range kcp.snd_buf {
  190. if sn == seg.Number {
  191. kcp.snd_buf = append(kcp.snd_buf[:k], kcp.snd_buf[k+1:]...)
  192. seg.Release()
  193. break
  194. }
  195. if _itimediff(sn, seg.Number) < 0 {
  196. break
  197. }
  198. }
  199. }
  200. func (kcp *KCP) parse_fastack(sn uint32) {
  201. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  202. return
  203. }
  204. for _, seg := range kcp.snd_buf {
  205. if _itimediff(sn, seg.Number) < 0 {
  206. break
  207. } else if sn != seg.Number {
  208. seg.ackSkipped++
  209. }
  210. }
  211. }
  212. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  213. count := 0
  214. for _, seg := range kcp.snd_buf {
  215. if _itimediff(receivingNext, seg.Number) > 0 {
  216. seg.Release()
  217. count++
  218. } else {
  219. break
  220. }
  221. }
  222. kcp.snd_buf = kcp.snd_buf[count:]
  223. }
  224. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  225. kcp.acklist.Clear(sendingNext)
  226. }
  227. func (kcp *KCP) parse_data(newseg *DataSegment) {
  228. sn := newseg.Number
  229. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  230. _itimediff(sn, kcp.rcv_nxt) < 0 {
  231. return
  232. }
  233. idx := sn - kcp.rcv_nxt
  234. if !kcp.rcv_buf.Set(idx, newseg) {
  235. newseg.Release()
  236. }
  237. kcp.DumpReceivingBuf()
  238. }
  239. // Input when you received a low level packet (eg. UDP packet), call it
  240. func (kcp *KCP) Input(data []byte) int {
  241. kcp.lastIncomingTime = kcp.current
  242. var seg ISegment
  243. var maxack uint32
  244. var flag int
  245. for {
  246. seg, data = ReadSegment(data)
  247. if seg == nil {
  248. break
  249. }
  250. switch seg := seg.(type) {
  251. case *DataSegment:
  252. kcp.HandleOption(seg.Opt)
  253. kcp.HandleSendingNext(seg.SendingNext)
  254. kcp.acklist.Add(seg.Number, seg.Timestamp)
  255. kcp.receivingUpdated = true
  256. kcp.parse_data(seg)
  257. kcp.lastPayloadTime = kcp.current
  258. case *ACKSegment:
  259. kcp.HandleOption(seg.Opt)
  260. if kcp.rmt_wnd < seg.ReceivingWindow {
  261. kcp.rmt_wnd = seg.ReceivingWindow
  262. }
  263. kcp.HandleReceivingNext(seg.ReceivingNext)
  264. for i := 0; i < int(seg.Count); i++ {
  265. ts := seg.TimestampList[i]
  266. sn := seg.NumberList[i]
  267. if _itimediff(kcp.current, ts) >= 0 {
  268. kcp.update_ack(_itimediff(kcp.current, ts))
  269. }
  270. kcp.parse_ack(sn)
  271. if flag == 0 {
  272. flag = 1
  273. maxack = sn
  274. } else if _itimediff(sn, maxack) > 0 {
  275. maxack = sn
  276. }
  277. }
  278. kcp.lastPayloadTime = kcp.current
  279. case *CmdOnlySegment:
  280. kcp.HandleOption(seg.Opt)
  281. if seg.Cmd == SegmentCommandTerminated {
  282. if kcp.state == StateActive ||
  283. kcp.state == StateReadyToClose ||
  284. kcp.state == StatePeerClosed {
  285. kcp.SetState(StateTerminating)
  286. } else if kcp.state == StateTerminating {
  287. kcp.SetState(StateTerminated)
  288. }
  289. }
  290. kcp.HandleReceivingNext(seg.ReceivinNext)
  291. kcp.HandleSendingNext(seg.SendingNext)
  292. default:
  293. }
  294. kcp.shrink_buf()
  295. }
  296. if flag != 0 {
  297. kcp.parse_fastack(maxack)
  298. }
  299. return 0
  300. }
  301. // flush pending data
  302. func (kcp *KCP) flush() {
  303. if kcp.state == StateTerminated {
  304. return
  305. }
  306. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  307. kcp.OnClose()
  308. }
  309. if kcp.state == StateTerminating {
  310. kcp.output.Write(&CmdOnlySegment{
  311. Conv: kcp.conv,
  312. Cmd: SegmentCommandTerminated,
  313. })
  314. kcp.output.Flush()
  315. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  316. kcp.SetState(StateTerminated)
  317. }
  318. return
  319. }
  320. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  321. kcp.SetState(StateTerminating)
  322. }
  323. current := kcp.current
  324. lost := false
  325. // flush acknowledges
  326. if kcp.acklist.Flush() {
  327. kcp.receivingUpdated = false
  328. }
  329. // calculate window size
  330. cwnd := kcp.snd_una + kcp.snd_wnd
  331. if cwnd < kcp.rmt_wnd {
  332. cwnd = kcp.rmt_wnd
  333. }
  334. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  335. cwnd = kcp.snd_una + kcp.cwnd
  336. }
  337. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  338. seg := kcp.snd_queue.Pop()
  339. seg.Conv = kcp.conv
  340. seg.Number = kcp.snd_nxt
  341. seg.timeout = current
  342. seg.ackSkipped = 0
  343. seg.transmit = 0
  344. kcp.snd_buf = append(kcp.snd_buf, seg)
  345. kcp.snd_nxt++
  346. }
  347. // calculate resent
  348. resent := uint32(kcp.fastresend)
  349. if kcp.fastresend <= 0 {
  350. resent = 0xffffffff
  351. }
  352. // flush data segments
  353. for _, segment := range kcp.snd_buf {
  354. needsend := false
  355. if segment.transmit == 0 {
  356. needsend = true
  357. segment.transmit++
  358. segment.timeout = current + kcp.rx_rto
  359. } else if _itimediff(current, segment.timeout) >= 0 {
  360. needsend = true
  361. segment.transmit++
  362. segment.timeout = current + kcp.rx_rto
  363. lost = true
  364. } else if segment.ackSkipped >= resent {
  365. needsend = true
  366. segment.transmit++
  367. segment.ackSkipped = 0
  368. segment.timeout = current + kcp.rx_rto
  369. lost = true
  370. }
  371. if needsend {
  372. segment.Timestamp = current
  373. segment.SendingNext = kcp.snd_una
  374. segment.Opt = 0
  375. if kcp.state == StateReadyToClose {
  376. segment.Opt = SegmentOptionClose
  377. }
  378. kcp.output.Write(segment)
  379. kcp.sendingUpdated = false
  380. }
  381. }
  382. if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  383. seg := &CmdOnlySegment{
  384. Conv: kcp.conv,
  385. Cmd: SegmentCommandPing,
  386. ReceivinNext: kcp.rcv_nxt,
  387. SendingNext: kcp.snd_una,
  388. }
  389. if kcp.state == StateReadyToClose {
  390. seg.Opt = SegmentOptionClose
  391. }
  392. kcp.output.Write(seg)
  393. kcp.lastPingTime = kcp.current
  394. kcp.sendingUpdated = false
  395. kcp.receivingUpdated = false
  396. }
  397. // flash remain segments
  398. kcp.output.Flush()
  399. if kcp.congestionControl {
  400. if lost {
  401. kcp.cwnd = 3 * kcp.cwnd / 4
  402. } else {
  403. kcp.cwnd += kcp.cwnd / 4
  404. }
  405. if kcp.cwnd < 4 {
  406. kcp.cwnd = 4
  407. }
  408. if kcp.cwnd > kcp.snd_wnd {
  409. kcp.cwnd = kcp.snd_wnd
  410. }
  411. }
  412. }
  413. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  414. // ikcp_check when to call it again (without ikcp_input/_send calling).
  415. // 'current' - current timestamp in millisec.
  416. func (kcp *KCP) Update(current uint32) {
  417. kcp.current = current
  418. kcp.flush()
  419. }
  420. // NoDelay options
  421. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  422. // nodelay: 0:disable(default), 1:enable
  423. // interval: internal update timer interval in millisec, default is 100ms
  424. // resend: 0:disable fast resend(default), 1:enable fast resend
  425. // nc: 0:normal congestion control(default), 1:disable congestion control
  426. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  427. kcp.interval = interval
  428. if resend >= 0 {
  429. kcp.fastresend = int32(resend)
  430. }
  431. kcp.congestionControl = congestionControl
  432. return 0
  433. }
  434. // WaitSnd gets how many packet is waiting to be sent
  435. func (kcp *KCP) WaitSnd() uint32 {
  436. return uint32(len(kcp.snd_buf)) + kcp.snd_queue.Len()
  437. }
  438. func (this *KCP) ClearSendQueue() {
  439. this.snd_queue.Clear()
  440. for _, seg := range this.snd_buf {
  441. seg.Release()
  442. }
  443. this.snd_buf = nil
  444. }