kcp.go 12 KB


  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. v2io "github.com/v2ray/v2ray-core/common/io"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. )
  12. const (
  13. IKCP_RTO_NDL = 30 // no delay min rto
  14. IKCP_RTO_MIN = 100 // normal min rto
  15. IKCP_RTO_DEF = 200
  16. IKCP_RTO_MAX = 60000
  17. IKCP_WND_SND = 32
  18. IKCP_WND_RCV = 32
  19. IKCP_MTU_DEF = 1350
  20. IKCP_ACK_FAST = 3
  21. IKCP_INTERVAL = 100
  22. IKCP_THRESH_INIT = 2
  23. IKCP_THRESH_MIN = 2
  24. )
  25. func _itimediff(later, earlier uint32) int32 {
  26. return (int32)(later - earlier)
  27. }
  28. type State int
  29. const (
  30. StateActive State = 0
  31. StateReadyToClose State = 1
  32. StatePeerClosed State = 2
  33. StateTerminating State = 3
  34. StateTerminated State = 4
  35. )
  36. // KCP defines a single KCP connection
  37. type KCP struct {
  38. conv uint16
  39. state State
  40. stateBeginTime uint32
  41. lastIncomingTime uint32
  42. lastPayloadTime uint32
  43. sendingUpdated bool
  44. receivingUpdated bool
  45. lastPingTime uint32
  46. mtu, mss uint32
  47. snd_una, snd_nxt, rcv_nxt uint32
  48. rx_rttvar, rx_srtt, rx_rto uint32
  49. snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
  50. current, interval, ts_flush, xmit uint32
  51. updated bool
  52. snd_queue *SendingQueue
  53. rcv_queue []*DataSegment
  54. snd_buf []*DataSegment
  55. rcv_buf *ReceivingWindow
  56. acklist *ACKList
  57. fastresend int32
  58. congestionControl bool
  59. output *SegmentWriter
  60. }
  61. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  62. // from the same connection.
  63. func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
  64. log.Debug("KCP|Core: creating KCP ", conv)
  65. kcp := new(KCP)
  66. kcp.conv = conv
  67. kcp.snd_wnd = sendingWindowSize
  68. kcp.rcv_wnd = receivingWindowSize
  69. kcp.rmt_wnd = IKCP_WND_RCV
  70. kcp.mtu = mtu
  71. kcp.mss = kcp.mtu - DataSegmentOverhead
  72. kcp.rx_rto = IKCP_RTO_DEF
  73. kcp.interval = IKCP_INTERVAL
  74. kcp.ts_flush = IKCP_INTERVAL
  75. kcp.output = NewSegmentWriter(mtu, output)
  76. kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
  77. kcp.snd_queue = NewSendingQueue(sendingQueueSize)
  78. kcp.acklist = new(ACKList)
  79. kcp.cwnd = kcp.snd_wnd
  80. return kcp
  81. }
  82. func (kcp *KCP) HandleOption(opt SegmentOption) {
  83. if (opt & SegmentOptionClose) == SegmentOptionClose {
  84. kcp.OnPeerClosed()
  85. }
  86. }
  87. func (kcp *KCP) OnPeerClosed() {
  88. if kcp.state == StateReadyToClose {
  89. kcp.state = StateTerminating
  90. kcp.stateBeginTime = kcp.current
  91. }
  92. if kcp.state == StateActive {
  93. kcp.ClearSendQueue()
  94. kcp.state = StatePeerClosed
  95. kcp.stateBeginTime = kcp.current
  96. }
  97. }
  98. func (kcp *KCP) OnClose() {
  99. if kcp.state == StateActive {
  100. kcp.state = StateReadyToClose
  101. kcp.stateBeginTime = kcp.current
  102. }
  103. if kcp.state == StatePeerClosed {
  104. kcp.state = StateTerminating
  105. kcp.stateBeginTime = kcp.current
  106. }
  107. }
  108. // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
  109. func (kcp *KCP) Recv(buffer []byte) (n int) {
  110. if len(kcp.rcv_queue) == 0 {
  111. return -1
  112. }
  113. // merge fragment
  114. count := 0
  115. for _, seg := range kcp.rcv_queue {
  116. dataLen := seg.Data.Len()
  117. if dataLen > len(buffer) {
  118. break
  119. }
  120. copy(buffer, seg.Data.Value)
  121. seg.Release()
  122. buffer = buffer[dataLen:]
  123. n += dataLen
  124. count++
  125. }
  126. kcp.rcv_queue = kcp.rcv_queue[count:]
  127. kcp.DumpReceivingBuf()
  128. return
  129. }
  130. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  131. // @Private
  132. func (kcp *KCP) DumpReceivingBuf() {
  133. for {
  134. seg := kcp.rcv_buf.RemoveFirst()
  135. if seg == nil {
  136. break
  137. }
  138. kcp.rcv_queue = append(kcp.rcv_queue, seg)
  139. kcp.rcv_buf.Advance()
  140. kcp.rcv_nxt++
  141. kcp.receivingUpdated = true
  142. }
  143. }
  144. // Send is user/upper level send, returns below zero for error
  145. func (kcp *KCP) Send(buffer []byte) int {
  146. nBytes := 0
  147. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  148. var size int
  149. if len(buffer) > int(kcp.mss) {
  150. size = int(kcp.mss)
  151. } else {
  152. size = len(buffer)
  153. }
  154. seg := &DataSegment{
  155. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  156. }
  157. kcp.snd_queue.Push(seg)
  158. buffer = buffer[size:]
  159. nBytes += size
  160. }
  161. return nBytes
  162. }
  163. // https://tools.ietf.org/html/rfc6298
  164. func (kcp *KCP) update_ack(rtt int32) {
  165. if kcp.rx_srtt == 0 {
  166. kcp.rx_srtt = uint32(rtt)
  167. kcp.rx_rttvar = uint32(rtt) / 2
  168. } else {
  169. delta := rtt - int32(kcp.rx_srtt)
  170. if delta < 0 {
  171. delta = -delta
  172. }
  173. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  174. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  175. if kcp.rx_srtt < kcp.interval {
  176. kcp.rx_srtt = kcp.interval
  177. }
  178. }
  179. var rto uint32
  180. if kcp.interval < 4*kcp.rx_rttvar {
  181. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  182. } else {
  183. rto = kcp.rx_srtt + kcp.interval
  184. }
  185. if rto > IKCP_RTO_MAX {
  186. rto = IKCP_RTO_MAX
  187. }
  188. kcp.rx_rto = rto * 3 / 2
  189. }
  190. func (kcp *KCP) shrink_buf() {
  191. prevUna := kcp.snd_una
  192. if len(kcp.snd_buf) > 0 {
  193. seg := kcp.snd_buf[0]
  194. kcp.snd_una = seg.Number
  195. } else {
  196. kcp.snd_una = kcp.snd_nxt
  197. }
  198. if kcp.snd_una != prevUna {
  199. kcp.sendingUpdated = true
  200. }
  201. }
  202. func (kcp *KCP) parse_ack(sn uint32) {
  203. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  204. return
  205. }
  206. for k, seg := range kcp.snd_buf {
  207. if sn == seg.Number {
  208. kcp.snd_buf = append(kcp.snd_buf[:k], kcp.snd_buf[k+1:]...)
  209. seg.Release()
  210. break
  211. }
  212. if _itimediff(sn, seg.Number) < 0 {
  213. break
  214. }
  215. }
  216. }
  217. func (kcp *KCP) parse_fastack(sn uint32) {
  218. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  219. return
  220. }
  221. for _, seg := range kcp.snd_buf {
  222. if _itimediff(sn, seg.Number) < 0 {
  223. break
  224. } else if sn != seg.Number {
  225. seg.ackSkipped++
  226. }
  227. }
  228. }
  229. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  230. count := 0
  231. for _, seg := range kcp.snd_buf {
  232. if _itimediff(receivingNext, seg.Number) > 0 {
  233. seg.Release()
  234. count++
  235. } else {
  236. break
  237. }
  238. }
  239. kcp.snd_buf = kcp.snd_buf[count:]
  240. }
  241. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  242. if kcp.acklist.Clear(sendingNext) {
  243. kcp.receivingUpdated = true
  244. }
  245. }
  246. func (kcp *KCP) parse_data(newseg *DataSegment) {
  247. sn := newseg.Number
  248. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  249. _itimediff(sn, kcp.rcv_nxt) < 0 {
  250. return
  251. }
  252. idx := sn - kcp.rcv_nxt
  253. if !kcp.rcv_buf.Set(idx, newseg) {
  254. newseg.Release()
  255. }
  256. kcp.DumpReceivingBuf()
  257. }
  258. // Input when you received a low level packet (eg. UDP packet), call it
  259. func (kcp *KCP) Input(data []byte) int {
  260. kcp.lastIncomingTime = kcp.current
  261. var seg ISegment
  262. var maxack uint32
  263. var flag int
  264. for {
  265. seg, data = ReadSegment(data)
  266. if seg == nil {
  267. break
  268. }
  269. switch seg := seg.(type) {
  270. case *DataSegment:
  271. kcp.HandleOption(seg.Opt)
  272. kcp.HandleSendingNext(seg.SendingNext)
  273. kcp.acklist.Add(seg.Number, seg.Timestamp)
  274. kcp.receivingUpdated = true
  275. kcp.parse_data(seg)
  276. kcp.lastPayloadTime = kcp.current
  277. case *ACKSegment:
  278. kcp.HandleOption(seg.Opt)
  279. if kcp.rmt_wnd < seg.ReceivingWindow {
  280. kcp.rmt_wnd = seg.ReceivingWindow
  281. }
  282. kcp.HandleReceivingNext(seg.ReceivingNext)
  283. for i := 0; i < int(seg.Count); i++ {
  284. ts := seg.TimestampList[i]
  285. sn := seg.NumberList[i]
  286. if _itimediff(kcp.current, ts) >= 0 {
  287. kcp.update_ack(_itimediff(kcp.current, ts))
  288. }
  289. kcp.parse_ack(sn)
  290. if flag == 0 {
  291. flag = 1
  292. maxack = sn
  293. } else if _itimediff(sn, maxack) > 0 {
  294. maxack = sn
  295. }
  296. }
  297. kcp.lastPayloadTime = kcp.current
  298. case *CmdOnlySegment:
  299. kcp.HandleOption(seg.Opt)
  300. if seg.Cmd == SegmentCommandTerminated {
  301. if kcp.state == StateActive ||
  302. kcp.state == StateReadyToClose ||
  303. kcp.state == StatePeerClosed {
  304. kcp.state = StateTerminating
  305. kcp.stateBeginTime = kcp.current
  306. } else if kcp.state == StateTerminating {
  307. kcp.state = StateTerminated
  308. kcp.stateBeginTime = kcp.current
  309. }
  310. }
  311. kcp.HandleReceivingNext(seg.ReceivinNext)
  312. kcp.HandleSendingNext(seg.SendingNext)
  313. default:
  314. }
  315. kcp.shrink_buf()
  316. }
  317. if flag != 0 {
  318. kcp.parse_fastack(maxack)
  319. }
  320. return 0
  321. }
  322. // flush pending data
  323. func (kcp *KCP) flush() {
  324. if kcp.state == StateTerminated {
  325. return
  326. }
  327. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  328. kcp.OnClose()
  329. }
  330. if kcp.state == StateTerminating {
  331. kcp.output.Write(&CmdOnlySegment{
  332. Conv: kcp.conv,
  333. Cmd: SegmentCommandTerminated,
  334. })
  335. kcp.output.Flush()
  336. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  337. kcp.state = StateTerminated
  338. kcp.stateBeginTime = kcp.current
  339. }
  340. return
  341. }
  342. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  343. kcp.state = StateTerminating
  344. kcp.stateBeginTime = kcp.current
  345. }
  346. current := kcp.current
  347. lost := false
  348. // flush acknowledges
  349. //if kcp.receivingUpdated {
  350. ackSeg := kcp.acklist.AsSegment()
  351. if ackSeg != nil {
  352. ackSeg.Conv = kcp.conv
  353. ackSeg.ReceivingWindow = uint32(kcp.rcv_nxt + kcp.rcv_wnd)
  354. ackSeg.ReceivingNext = kcp.rcv_nxt
  355. kcp.output.Write(ackSeg)
  356. kcp.receivingUpdated = false
  357. }
  358. //}
  359. // calculate window size
  360. cwnd := kcp.snd_una + kcp.snd_wnd
  361. if cwnd < kcp.rmt_wnd {
  362. cwnd = kcp.rmt_wnd
  363. }
  364. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  365. cwnd = kcp.snd_una + kcp.cwnd
  366. }
  367. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  368. seg := kcp.snd_queue.Pop()
  369. seg.Conv = kcp.conv
  370. seg.Number = kcp.snd_nxt
  371. seg.timeout = current
  372. seg.ackSkipped = 0
  373. seg.transmit = 0
  374. kcp.snd_buf = append(kcp.snd_buf, seg)
  375. kcp.snd_nxt++
  376. }
  377. // calculate resent
  378. resent := uint32(kcp.fastresend)
  379. if kcp.fastresend <= 0 {
  380. resent = 0xffffffff
  381. }
  382. // flush data segments
  383. for _, segment := range kcp.snd_buf {
  384. needsend := false
  385. if segment.transmit == 0 {
  386. needsend = true
  387. segment.transmit++
  388. segment.timeout = current + kcp.rx_rto
  389. } else if _itimediff(current, segment.timeout) >= 0 {
  390. needsend = true
  391. segment.transmit++
  392. kcp.xmit++
  393. segment.timeout = current + kcp.rx_rto
  394. lost = true
  395. } else if segment.ackSkipped >= resent {
  396. needsend = true
  397. segment.transmit++
  398. segment.ackSkipped = 0
  399. segment.timeout = current + kcp.rx_rto
  400. lost = true
  401. }
  402. if needsend {
  403. segment.Timestamp = current
  404. segment.SendingNext = kcp.snd_una
  405. segment.Opt = 0
  406. if kcp.state == StateReadyToClose {
  407. segment.Opt = SegmentOptionClose
  408. }
  409. kcp.output.Write(segment)
  410. kcp.sendingUpdated = false
  411. }
  412. }
  413. if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  414. seg := &CmdOnlySegment{
  415. Conv: kcp.conv,
  416. Cmd: SegmentCommandPing,
  417. ReceivinNext: kcp.rcv_nxt,
  418. SendingNext: kcp.snd_una,
  419. }
  420. if kcp.state == StateReadyToClose {
  421. seg.Opt = SegmentOptionClose
  422. }
  423. kcp.output.Write(seg)
  424. kcp.lastPingTime = kcp.current
  425. kcp.sendingUpdated = false
  426. kcp.receivingUpdated = false
  427. }
  428. // flash remain segments
  429. kcp.output.Flush()
  430. if kcp.congestionControl {
  431. if lost {
  432. kcp.cwnd = 3 * kcp.cwnd / 4
  433. } else {
  434. kcp.cwnd += kcp.cwnd / 4
  435. }
  436. if kcp.cwnd < 4 {
  437. kcp.cwnd = 4
  438. }
  439. if kcp.cwnd > kcp.snd_wnd {
  440. kcp.cwnd = kcp.snd_wnd
  441. }
  442. }
  443. }
  444. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  445. // ikcp_check when to call it again (without ikcp_input/_send calling).
  446. // 'current' - current timestamp in millisec.
  447. func (kcp *KCP) Update(current uint32) {
  448. var slap int32
  449. kcp.current = current
  450. if !kcp.updated {
  451. kcp.updated = true
  452. kcp.ts_flush = kcp.current
  453. }
  454. slap = _itimediff(kcp.current, kcp.ts_flush)
  455. if slap >= 10000 || slap < -10000 {
  456. kcp.ts_flush = kcp.current
  457. slap = 0
  458. }
  459. if slap >= 0 {
  460. kcp.ts_flush += kcp.interval
  461. if _itimediff(kcp.current, kcp.ts_flush) >= 0 {
  462. kcp.ts_flush = kcp.current + kcp.interval
  463. }
  464. kcp.flush()
  465. }
  466. }
  467. // NoDelay options
  468. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  469. // nodelay: 0:disable(default), 1:enable
  470. // interval: internal update timer interval in millisec, default is 100ms
  471. // resend: 0:disable fast resend(default), 1:enable fast resend
  472. // nc: 0:normal congestion control(default), 1:disable congestion control
  473. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  474. kcp.interval = interval
  475. if resend >= 0 {
  476. kcp.fastresend = int32(resend)
  477. }
  478. kcp.congestionControl = congestionControl
  479. return 0
  480. }
  481. // WaitSnd gets how many packet is waiting to be sent
  482. func (kcp *KCP) WaitSnd() uint32 {
  483. return uint32(len(kcp.snd_buf)) + kcp.snd_queue.Len()
  484. }
  485. func (this *KCP) ClearSendQueue() {
  486. this.snd_queue.Clear()
  487. for _, seg := range this.snd_buf {
  488. seg.Release()
  489. }
  490. this.snd_buf = nil
  491. }