kcp.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. v2io "github.com/v2ray/v2ray-core/common/io"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. )
  12. const (
  13. IKCP_RTO_NDL = 30 // no delay min rto
  14. IKCP_RTO_MIN = 100 // normal min rto
  15. IKCP_RTO_DEF = 200
  16. IKCP_RTO_MAX = 60000
  17. IKCP_CMD_PUSH = 81 // cmd: push data
  18. IKCP_CMD_ACK = 82 // cmd: ack
  19. IKCP_WND_SND = 32
  20. IKCP_WND_RCV = 32
  21. IKCP_MTU_DEF = 1350
  22. IKCP_ACK_FAST = 3
  23. IKCP_INTERVAL = 100
  24. IKCP_OVERHEAD = 24
  25. IKCP_DEADLINK = 20
  26. IKCP_THRESH_INIT = 2
  27. IKCP_THRESH_MIN = 2
  28. IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
  29. IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
  30. )
  31. func _imin_(a, b uint32) uint32 {
  32. if a <= b {
  33. return a
  34. } else {
  35. return b
  36. }
  37. }
  38. func _imax_(a, b uint32) uint32 {
  39. if a >= b {
  40. return a
  41. } else {
  42. return b
  43. }
  44. }
  45. func _itimediff(later, earlier uint32) int32 {
  46. return (int32)(later - earlier)
  47. }
  48. type State int
  49. const (
  50. StateActive State = 0
  51. StateReadyToClose State = 1
  52. StatePeerClosed State = 2
  53. StateTerminating State = 3
  54. StateTerminated State = 4
  55. )
  56. // KCP defines a single KCP connection
  57. type KCP struct {
  58. conv uint16
  59. state State
  60. stateBeginTime uint32
  61. mtu, mss uint32
  62. snd_una, snd_nxt, rcv_nxt uint32
  63. ts_recent, ts_lastack, ssthresh uint32
  64. rx_rttvar, rx_srtt, rx_rto uint32
  65. snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
  66. current, interval, ts_flush, xmit uint32
  67. updated bool
  68. ts_probe, probe_wait uint32
  69. dead_link, incr uint32
  70. snd_queue *SendingQueue
  71. rcv_queue []*DataSegment
  72. snd_buf []*DataSegment
  73. rcv_buf *ReceivingWindow
  74. acklist *ACKList
  75. buffer []byte
  76. fastresend int32
  77. congestionControl bool
  78. output *SegmentWriter
  79. }
  80. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  81. // from the same connection.
  82. func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
  83. kcp := new(KCP)
  84. kcp.conv = conv
  85. kcp.snd_wnd = sendingWindowSize
  86. kcp.rcv_wnd = receivingWindowSize
  87. kcp.rmt_wnd = IKCP_WND_RCV
  88. kcp.mtu = mtu
  89. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  90. kcp.rx_rto = IKCP_RTO_DEF
  91. kcp.interval = IKCP_INTERVAL
  92. kcp.ts_flush = IKCP_INTERVAL
  93. kcp.ssthresh = IKCP_THRESH_INIT
  94. kcp.dead_link = IKCP_DEADLINK
  95. kcp.output = NewSegmentWriter(mtu, output)
  96. kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
  97. kcp.snd_queue = NewSendingQueue(sendingQueueSize)
  98. kcp.acklist = new(ACKList)
  99. return kcp
  100. }
  101. func (kcp *KCP) HandleOption(opt SegmentOption) {
  102. if (opt & SegmentOptionClose) == SegmentOptionClose {
  103. kcp.OnPeerClosed()
  104. }
  105. }
  106. func (kcp *KCP) OnPeerClosed() {
  107. if kcp.state == StateReadyToClose {
  108. kcp.state = StateTerminating
  109. kcp.stateBeginTime = kcp.current
  110. log.Info("KCP terminating at ", kcp.current)
  111. }
  112. if kcp.state == StateActive {
  113. kcp.ClearSendQueue()
  114. kcp.state = StatePeerClosed
  115. kcp.stateBeginTime = kcp.current
  116. log.Info("KCP peer close at ", kcp.current)
  117. }
  118. }
  119. func (kcp *KCP) OnClose() {
  120. if kcp.state == StateActive {
  121. kcp.state = StateReadyToClose
  122. kcp.stateBeginTime = kcp.current
  123. log.Info("KCP ready close at ", kcp.current)
  124. }
  125. if kcp.state == StatePeerClosed {
  126. kcp.state = StateTerminating
  127. kcp.stateBeginTime = kcp.current
  128. log.Info("KCP terminating at ", kcp.current)
  129. }
  130. }
  131. // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
  132. func (kcp *KCP) Recv(buffer []byte) (n int) {
  133. if len(kcp.rcv_queue) == 0 {
  134. return -1
  135. }
  136. // merge fragment
  137. count := 0
  138. for _, seg := range kcp.rcv_queue {
  139. dataLen := seg.Data.Len()
  140. if dataLen > len(buffer) {
  141. break
  142. }
  143. copy(buffer, seg.Data.Value)
  144. seg.Release()
  145. buffer = buffer[dataLen:]
  146. n += dataLen
  147. count++
  148. }
  149. kcp.rcv_queue = kcp.rcv_queue[count:]
  150. kcp.DumpReceivingBuf()
  151. return
  152. }
  153. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  154. // @Private
  155. func (kcp *KCP) DumpReceivingBuf() {
  156. for {
  157. seg := kcp.rcv_buf.RemoveFirst()
  158. if seg == nil {
  159. break
  160. }
  161. kcp.rcv_queue = append(kcp.rcv_queue, seg)
  162. kcp.rcv_buf.Advance()
  163. kcp.rcv_nxt++
  164. }
  165. }
  166. // Send is user/upper level send, returns below zero for error
  167. func (kcp *KCP) Send(buffer []byte) int {
  168. nBytes := 0
  169. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  170. var size int
  171. if len(buffer) > int(kcp.mss) {
  172. size = int(kcp.mss)
  173. } else {
  174. size = len(buffer)
  175. }
  176. seg := &DataSegment{
  177. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  178. }
  179. kcp.snd_queue.Push(seg)
  180. buffer = buffer[size:]
  181. nBytes += size
  182. }
  183. return nBytes
  184. }
  185. // https://tools.ietf.org/html/rfc6298
  186. func (kcp *KCP) update_ack(rtt int32) {
  187. var rto uint32 = 0
  188. if kcp.rx_srtt == 0 {
  189. kcp.rx_srtt = uint32(rtt)
  190. kcp.rx_rttvar = uint32(rtt) / 2
  191. } else {
  192. delta := rtt - int32(kcp.rx_srtt)
  193. if delta < 0 {
  194. delta = -delta
  195. }
  196. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  197. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  198. if kcp.rx_srtt < kcp.interval {
  199. kcp.rx_srtt = kcp.interval
  200. }
  201. }
  202. rto = kcp.rx_srtt + _imax_(kcp.interval, 4*kcp.rx_rttvar)
  203. if rto > IKCP_RTO_MAX {
  204. rto = IKCP_RTO_MAX
  205. }
  206. kcp.rx_rto = rto * 3 / 2
  207. }
  208. func (kcp *KCP) shrink_buf() {
  209. if len(kcp.snd_buf) > 0 {
  210. seg := kcp.snd_buf[0]
  211. kcp.snd_una = seg.Number
  212. } else {
  213. kcp.snd_una = kcp.snd_nxt
  214. }
  215. }
  216. func (kcp *KCP) parse_ack(sn uint32) {
  217. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  218. return
  219. }
  220. for k, seg := range kcp.snd_buf {
  221. if sn == seg.Number {
  222. kcp.snd_buf = append(kcp.snd_buf[:k], kcp.snd_buf[k+1:]...)
  223. seg.Release()
  224. break
  225. }
  226. if _itimediff(sn, seg.Number) < 0 {
  227. break
  228. }
  229. }
  230. }
  231. func (kcp *KCP) parse_fastack(sn uint32) {
  232. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  233. return
  234. }
  235. for _, seg := range kcp.snd_buf {
  236. if _itimediff(sn, seg.Number) < 0 {
  237. break
  238. } else if sn != seg.Number {
  239. seg.ackSkipped++
  240. }
  241. }
  242. }
  243. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  244. count := 0
  245. for _, seg := range kcp.snd_buf {
  246. if _itimediff(receivingNext, seg.Number) > 0 {
  247. seg.Release()
  248. count++
  249. } else {
  250. break
  251. }
  252. }
  253. kcp.snd_buf = kcp.snd_buf[count:]
  254. }
  255. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  256. kcp.acklist.Clear(sendingNext)
  257. }
  258. func (kcp *KCP) parse_data(newseg *DataSegment) {
  259. sn := newseg.Number
  260. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  261. _itimediff(sn, kcp.rcv_nxt) < 0 {
  262. return
  263. }
  264. idx := sn - kcp.rcv_nxt
  265. if !kcp.rcv_buf.Set(idx, newseg) {
  266. newseg.Release()
  267. }
  268. kcp.DumpReceivingBuf()
  269. }
  270. // Input when you received a low level packet (eg. UDP packet), call it
  271. func (kcp *KCP) Input(data []byte) int {
  272. log.Info("KCP input at ", kcp.current)
  273. var seg ISegment
  274. var maxack uint32
  275. var flag int
  276. for {
  277. seg, data = ReadSegment(data)
  278. if seg == nil {
  279. break
  280. }
  281. switch seg := seg.(type) {
  282. case *DataSegment:
  283. kcp.HandleOption(seg.Opt)
  284. kcp.HandleSendingNext(seg.SendingNext)
  285. kcp.shrink_buf()
  286. kcp.acklist.Add(seg.Number, seg.Timestamp)
  287. kcp.parse_data(seg)
  288. case *ACKSegment:
  289. kcp.HandleOption(seg.Opt)
  290. if kcp.rmt_wnd < seg.ReceivingWindow {
  291. kcp.rmt_wnd = seg.ReceivingWindow
  292. }
  293. kcp.HandleReceivingNext(seg.ReceivingNext)
  294. for i := 0; i < int(seg.Count); i++ {
  295. ts := seg.TimestampList[i]
  296. sn := seg.NumberList[i]
  297. if _itimediff(kcp.current, ts) >= 0 {
  298. kcp.update_ack(_itimediff(kcp.current, ts))
  299. }
  300. kcp.parse_ack(sn)
  301. if flag == 0 {
  302. flag = 1
  303. maxack = sn
  304. } else if _itimediff(sn, maxack) > 0 {
  305. maxack = sn
  306. }
  307. }
  308. kcp.shrink_buf()
  309. case *CmdOnlySegment:
  310. kcp.HandleOption(seg.Opt)
  311. if seg.Cmd == SegmentCommandTerminated {
  312. if kcp.state == StateActive ||
  313. kcp.state == StateReadyToClose ||
  314. kcp.state == StatePeerClosed {
  315. kcp.state = StateTerminating
  316. kcp.stateBeginTime = kcp.current
  317. log.Info("KCP terminating at ", kcp.current)
  318. } else if kcp.state == StateTerminating {
  319. kcp.state = StateTerminated
  320. kcp.stateBeginTime = kcp.current
  321. log.Info("KCP terminated at ", kcp.current)
  322. }
  323. }
  324. kcp.HandleReceivingNext(seg.ReceivinNext)
  325. kcp.HandleSendingNext(seg.SendingNext)
  326. default:
  327. }
  328. }
  329. if flag != 0 {
  330. kcp.parse_fastack(maxack)
  331. }
  332. return 0
  333. }
  334. // flush pending data
  335. func (kcp *KCP) flush() {
  336. if kcp.state == StateTerminated {
  337. return
  338. }
  339. if kcp.state == StateTerminating {
  340. kcp.output.Write(&CmdOnlySegment{
  341. Conv: kcp.conv,
  342. Cmd: SegmentCommandTerminated,
  343. })
  344. kcp.output.Flush()
  345. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  346. kcp.state = StateTerminated
  347. log.Info("KCP terminated at ", kcp.current)
  348. kcp.stateBeginTime = kcp.current
  349. }
  350. return
  351. }
  352. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  353. kcp.state = StateTerminating
  354. log.Info("KCP terminating at ", kcp.current)
  355. kcp.stateBeginTime = kcp.current
  356. }
  357. current := kcp.current
  358. segSent := false
  359. //lost := false
  360. //var seg Segment
  361. //seg.conv = kcp.conv
  362. //seg.cmd = IKCP_CMD_ACK
  363. //seg.wnd = uint32(kcp.rcv_nxt + kcp.rcv_wnd)
  364. //seg.una = kcp.rcv_nxt
  365. // flush acknowledges
  366. ackSeg := kcp.acklist.AsSegment()
  367. if ackSeg != nil {
  368. ackSeg.Conv = kcp.conv
  369. ackSeg.ReceivingWindow = uint32(kcp.rcv_nxt + kcp.rcv_wnd)
  370. ackSeg.ReceivingNext = kcp.rcv_nxt
  371. kcp.output.Write(ackSeg)
  372. segSent = true
  373. }
  374. // calculate window size
  375. cwnd := _imin_(kcp.snd_una+kcp.snd_wnd, kcp.rmt_wnd)
  376. if kcp.congestionControl {
  377. cwnd = _imin_(kcp.cwnd, cwnd)
  378. }
  379. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  380. seg := kcp.snd_queue.Pop()
  381. seg.Conv = kcp.conv
  382. seg.Number = kcp.snd_nxt
  383. seg.timeout = current
  384. seg.ackSkipped = 0
  385. seg.transmit = 0
  386. kcp.snd_buf = append(kcp.snd_buf, seg)
  387. kcp.snd_nxt++
  388. }
  389. // calculate resent
  390. resent := uint32(kcp.fastresend)
  391. if kcp.fastresend <= 0 {
  392. resent = 0xffffffff
  393. }
  394. // flush data segments
  395. for _, segment := range kcp.snd_buf {
  396. needsend := false
  397. if segment.transmit == 0 {
  398. needsend = true
  399. segment.transmit++
  400. segment.timeout = current + kcp.rx_rto
  401. } else if _itimediff(current, segment.timeout) >= 0 {
  402. needsend = true
  403. segment.transmit++
  404. kcp.xmit++
  405. segment.timeout = current + kcp.rx_rto
  406. //lost = true
  407. } else if segment.ackSkipped >= resent {
  408. needsend = true
  409. segment.transmit++
  410. segment.ackSkipped = 0
  411. segment.timeout = current + kcp.rx_rto
  412. }
  413. if needsend {
  414. segment.Timestamp = current
  415. segment.SendingNext = kcp.snd_una
  416. segment.Opt = 0
  417. if kcp.state == StateReadyToClose {
  418. segment.Opt = SegmentOptionClose
  419. }
  420. kcp.output.Write(segment)
  421. segSent = true
  422. if segment.transmit >= kcp.dead_link {
  423. kcp.state = 0xFFFFFFFF
  424. }
  425. }
  426. }
  427. // flash remain segments
  428. kcp.output.Flush()
  429. if !segSent && kcp.state == StateReadyToClose {
  430. kcp.output.Write(&CmdOnlySegment{
  431. Conv: kcp.conv,
  432. Cmd: SegmentCommandPing,
  433. Opt: SegmentOptionClose,
  434. ReceivinNext: kcp.rcv_nxt,
  435. SendingNext: kcp.snd_nxt,
  436. })
  437. kcp.output.Flush()
  438. segSent = true
  439. }
  440. if !segSent && kcp.state == StateTerminating {
  441. kcp.output.Write(&CmdOnlySegment{
  442. Conv: kcp.conv,
  443. Cmd: SegmentCommandTerminated,
  444. ReceivinNext: kcp.rcv_nxt,
  445. SendingNext: kcp.snd_una,
  446. })
  447. kcp.output.Flush()
  448. segSent = true
  449. }
  450. if !segSent {
  451. kcp.output.Write(&CmdOnlySegment{
  452. Conv: kcp.conv,
  453. Cmd: SegmentCommandPing,
  454. ReceivinNext: kcp.rcv_nxt,
  455. SendingNext: kcp.snd_una,
  456. })
  457. kcp.output.Flush()
  458. segSent = true
  459. }
  460. // update ssthresh
  461. // rate halving, https://tools.ietf.org/html/rfc6937
  462. /*
  463. if change != 0 {
  464. inflight := kcp.snd_nxt - kcp.snd_una
  465. kcp.ssthresh = inflight / 2
  466. if kcp.ssthresh < IKCP_THRESH_MIN {
  467. kcp.ssthresh = IKCP_THRESH_MIN
  468. }
  469. kcp.cwnd = kcp.ssthresh + resent
  470. kcp.incr = kcp.cwnd * kcp.mss
  471. }*/
  472. // congestion control, https://tools.ietf.org/html/rfc5681
  473. /*
  474. if lost {
  475. kcp.ssthresh = cwnd / 2
  476. if kcp.ssthresh < IKCP_THRESH_MIN {
  477. kcp.ssthresh = IKCP_THRESH_MIN
  478. }
  479. kcp.cwnd = 1
  480. kcp.incr = kcp.mss
  481. }
  482. if kcp.cwnd < 1 {
  483. kcp.cwnd = 1
  484. kcp.incr = kcp.mss
  485. }*/
  486. }
  487. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  488. // ikcp_check when to call it again (without ikcp_input/_send calling).
  489. // 'current' - current timestamp in millisec.
  490. func (kcp *KCP) Update(current uint32) {
  491. var slap int32
  492. kcp.current = current
  493. if !kcp.updated {
  494. kcp.updated = true
  495. kcp.ts_flush = kcp.current
  496. }
  497. slap = _itimediff(kcp.current, kcp.ts_flush)
  498. if slap >= 10000 || slap < -10000 {
  499. kcp.ts_flush = kcp.current
  500. slap = 0
  501. }
  502. if slap >= 0 {
  503. kcp.ts_flush += kcp.interval
  504. if _itimediff(kcp.current, kcp.ts_flush) >= 0 {
  505. kcp.ts_flush = kcp.current + kcp.interval
  506. }
  507. kcp.flush()
  508. }
  509. }
  510. // NoDelay options
  511. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  512. // nodelay: 0:disable(default), 1:enable
  513. // interval: internal update timer interval in millisec, default is 100ms
  514. // resend: 0:disable fast resend(default), 1:enable fast resend
  515. // nc: 0:normal congestion control(default), 1:disable congestion control
  516. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  517. kcp.interval = interval
  518. if resend >= 0 {
  519. kcp.fastresend = int32(resend)
  520. }
  521. kcp.congestionControl = congestionControl
  522. return 0
  523. }
  524. // WaitSnd gets how many packet is waiting to be sent
  525. func (kcp *KCP) WaitSnd() uint32 {
  526. return uint32(len(kcp.snd_buf)) + kcp.snd_queue.Len()
  527. }
  528. func (this *KCP) ClearSendQueue() {
  529. this.snd_queue.Clear()
  530. for _, seg := range this.snd_buf {
  531. seg.Release()
  532. }
  533. this.snd_buf = nil
  534. }