kcp.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. const (
  12. IKCP_RTO_NDL = 30 // no delay min rto
  13. IKCP_RTO_MIN = 100 // normal min rto
  14. IKCP_RTO_DEF = 200
  15. IKCP_RTO_MAX = 60000
  16. IKCP_WND_SND = 32
  17. IKCP_WND_RCV = 32
  18. IKCP_INTERVAL = 100
  19. )
  20. func _itimediff(later, earlier uint32) int32 {
  21. return (int32)(later - earlier)
  22. }
  23. type State int
  24. const (
  25. StateActive State = 0
  26. StateReadyToClose State = 1
  27. StatePeerClosed State = 2
  28. StateTerminating State = 3
  29. StateTerminated State = 4
  30. )
  31. // KCP defines a single KCP connection
  32. type KCP struct {
  33. conv uint16
  34. state State
  35. stateBeginTime uint32
  36. lastIncomingTime uint32
  37. lastPayloadTime uint32
  38. sendingUpdated bool
  39. receivingUpdated bool
  40. lastPingTime uint32
  41. mss uint32
  42. snd_una, snd_nxt, rcv_nxt uint32
  43. rx_rttvar, rx_srtt, rx_rto uint32
  44. snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
  45. current, interval uint32
  46. snd_queue *SendingQueue
  47. rcv_queue *ReceivingQueue
  48. snd_buf *SendingWindow
  49. rcv_buf *ReceivingWindow
  50. acklist *ACKList
  51. fastresend int32
  52. congestionControl bool
  53. output *SegmentWriter
  54. }
  55. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  56. // from the same connection.
  57. func NewKCP(conv uint16, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output *AuthenticationWriter) *KCP {
  58. log.Debug("KCP|Core: creating KCP ", conv)
  59. kcp := new(KCP)
  60. kcp.conv = conv
  61. kcp.snd_wnd = sendingWindowSize
  62. kcp.rcv_wnd = receivingWindowSize
  63. kcp.rmt_wnd = IKCP_WND_RCV
  64. kcp.mss = output.Mtu() - DataSegmentOverhead
  65. kcp.rx_rto = IKCP_RTO_DEF
  66. kcp.interval = IKCP_INTERVAL
  67. kcp.output = NewSegmentWriter(output)
  68. kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
  69. kcp.snd_queue = NewSendingQueue(sendingQueueSize)
  70. kcp.rcv_queue = NewReceivingQueue()
  71. kcp.acklist = NewACKList(kcp)
  72. kcp.snd_buf = NewSendingWindow(kcp, sendingWindowSize)
  73. kcp.cwnd = kcp.snd_wnd
  74. return kcp
  75. }
  76. func (kcp *KCP) SetState(state State) {
  77. kcp.state = state
  78. kcp.stateBeginTime = kcp.current
  79. switch state {
  80. case StateReadyToClose:
  81. kcp.rcv_queue.Close()
  82. case StatePeerClosed:
  83. kcp.ClearSendQueue()
  84. case StateTerminating:
  85. kcp.rcv_queue.Close()
  86. case StateTerminated:
  87. kcp.rcv_queue.Close()
  88. }
  89. }
  90. func (kcp *KCP) HandleOption(opt SegmentOption) {
  91. if (opt & SegmentOptionClose) == SegmentOptionClose {
  92. kcp.OnPeerClosed()
  93. }
  94. }
  95. func (kcp *KCP) OnPeerClosed() {
  96. if kcp.state == StateReadyToClose {
  97. kcp.SetState(StateTerminating)
  98. }
  99. if kcp.state == StateActive {
  100. kcp.SetState(StatePeerClosed)
  101. }
  102. }
  103. func (kcp *KCP) OnClose() {
  104. if kcp.state == StateActive {
  105. kcp.SetState(StateReadyToClose)
  106. }
  107. if kcp.state == StatePeerClosed {
  108. kcp.SetState(StateTerminating)
  109. }
  110. }
  111. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  112. // @Private
  113. func (kcp *KCP) DumpReceivingBuf() {
  114. for {
  115. seg := kcp.rcv_buf.RemoveFirst()
  116. if seg == nil {
  117. break
  118. }
  119. kcp.rcv_queue.Put(seg.Data)
  120. seg.Data = nil
  121. kcp.rcv_buf.Advance()
  122. kcp.rcv_nxt++
  123. kcp.receivingUpdated = true
  124. }
  125. }
  126. // Send is user/upper level send, returns below zero for error
  127. func (kcp *KCP) Send(buffer []byte) int {
  128. nBytes := 0
  129. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  130. var size int
  131. if len(buffer) > int(kcp.mss) {
  132. size = int(kcp.mss)
  133. } else {
  134. size = len(buffer)
  135. }
  136. seg := &DataSegment{
  137. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  138. }
  139. kcp.snd_queue.Push(seg)
  140. buffer = buffer[size:]
  141. nBytes += size
  142. }
  143. return nBytes
  144. }
  145. // https://tools.ietf.org/html/rfc6298
  146. func (kcp *KCP) update_ack(rtt int32) {
  147. if kcp.rx_srtt == 0 {
  148. kcp.rx_srtt = uint32(rtt)
  149. kcp.rx_rttvar = uint32(rtt) / 2
  150. } else {
  151. delta := rtt - int32(kcp.rx_srtt)
  152. if delta < 0 {
  153. delta = -delta
  154. }
  155. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  156. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  157. if kcp.rx_srtt < kcp.interval {
  158. kcp.rx_srtt = kcp.interval
  159. }
  160. }
  161. var rto uint32
  162. if kcp.interval < 4*kcp.rx_rttvar {
  163. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  164. } else {
  165. rto = kcp.rx_srtt + kcp.interval
  166. }
  167. if rto > IKCP_RTO_MAX {
  168. rto = IKCP_RTO_MAX
  169. }
  170. kcp.rx_rto = rto * 3 / 2
  171. }
  172. func (kcp *KCP) shrink_buf() {
  173. prevUna := kcp.snd_una
  174. if kcp.snd_buf.Len() > 0 {
  175. seg := kcp.snd_buf.First()
  176. kcp.snd_una = seg.Number
  177. } else {
  178. kcp.snd_una = kcp.snd_nxt
  179. }
  180. if kcp.snd_una != prevUna {
  181. kcp.sendingUpdated = true
  182. }
  183. }
  184. func (kcp *KCP) parse_ack(sn uint32) {
  185. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  186. return
  187. }
  188. kcp.snd_buf.Remove(sn - kcp.snd_una)
  189. }
  190. func (kcp *KCP) parse_fastack(sn uint32) {
  191. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  192. return
  193. }
  194. kcp.snd_buf.HandleFastAck(sn)
  195. }
  196. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  197. kcp.snd_buf.Clear(receivingNext)
  198. }
  199. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  200. kcp.acklist.Clear(sendingNext)
  201. }
  202. func (kcp *KCP) parse_data(newseg *DataSegment) {
  203. sn := newseg.Number
  204. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  205. _itimediff(sn, kcp.rcv_nxt) < 0 {
  206. return
  207. }
  208. idx := sn - kcp.rcv_nxt
  209. if !kcp.rcv_buf.Set(idx, newseg) {
  210. newseg.Release()
  211. }
  212. kcp.DumpReceivingBuf()
  213. }
  214. // Input when you received a low level packet (eg. UDP packet), call it
  215. func (kcp *KCP) Input(data []byte) int {
  216. kcp.lastIncomingTime = kcp.current
  217. var seg ISegment
  218. var maxack uint32
  219. var flag int
  220. for {
  221. seg, data = ReadSegment(data)
  222. if seg == nil {
  223. break
  224. }
  225. switch seg := seg.(type) {
  226. case *DataSegment:
  227. kcp.HandleOption(seg.Opt)
  228. kcp.HandleSendingNext(seg.SendingNext)
  229. kcp.acklist.Add(seg.Number, seg.Timestamp)
  230. kcp.receivingUpdated = true
  231. kcp.parse_data(seg)
  232. kcp.lastPayloadTime = kcp.current
  233. case *ACKSegment:
  234. kcp.HandleOption(seg.Opt)
  235. if kcp.rmt_wnd < seg.ReceivingWindow {
  236. kcp.rmt_wnd = seg.ReceivingWindow
  237. }
  238. kcp.HandleReceivingNext(seg.ReceivingNext)
  239. for i := 0; i < int(seg.Count); i++ {
  240. ts := seg.TimestampList[i]
  241. sn := seg.NumberList[i]
  242. if _itimediff(kcp.current, ts) >= 0 {
  243. kcp.update_ack(_itimediff(kcp.current, ts))
  244. }
  245. kcp.parse_ack(sn)
  246. kcp.shrink_buf()
  247. if flag == 0 {
  248. flag = 1
  249. maxack = sn
  250. } else if _itimediff(sn, maxack) > 0 {
  251. maxack = sn
  252. }
  253. }
  254. kcp.lastPayloadTime = kcp.current
  255. case *CmdOnlySegment:
  256. kcp.HandleOption(seg.Opt)
  257. if seg.Cmd == SegmentCommandTerminated {
  258. if kcp.state == StateActive ||
  259. kcp.state == StateReadyToClose ||
  260. kcp.state == StatePeerClosed {
  261. kcp.SetState(StateTerminating)
  262. } else if kcp.state == StateTerminating {
  263. kcp.SetState(StateTerminated)
  264. }
  265. }
  266. kcp.HandleReceivingNext(seg.ReceivinNext)
  267. kcp.HandleSendingNext(seg.SendingNext)
  268. kcp.shrink_buf()
  269. default:
  270. }
  271. }
  272. if flag != 0 {
  273. kcp.parse_fastack(maxack)
  274. }
  275. return 0
  276. }
  277. // flush pending data
  278. func (kcp *KCP) flush() {
  279. if kcp.state == StateTerminated {
  280. return
  281. }
  282. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  283. kcp.OnClose()
  284. }
  285. if kcp.state == StateTerminating {
  286. kcp.output.Write(&CmdOnlySegment{
  287. Conv: kcp.conv,
  288. Cmd: SegmentCommandTerminated,
  289. })
  290. kcp.output.Flush()
  291. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  292. kcp.SetState(StateTerminated)
  293. }
  294. return
  295. }
  296. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  297. kcp.SetState(StateTerminating)
  298. }
  299. current := kcp.current
  300. // flush acknowledges
  301. if kcp.acklist.Flush() {
  302. kcp.receivingUpdated = false
  303. }
  304. // calculate window size
  305. cwnd := kcp.snd_una + kcp.snd_wnd
  306. if cwnd < kcp.rmt_wnd {
  307. cwnd = kcp.rmt_wnd
  308. }
  309. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  310. cwnd = kcp.snd_una + kcp.cwnd
  311. }
  312. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  313. seg := kcp.snd_queue.Pop()
  314. seg.Conv = kcp.conv
  315. seg.Number = kcp.snd_nxt
  316. seg.timeout = current
  317. seg.ackSkipped = 0
  318. seg.transmit = 0
  319. kcp.snd_buf.Push(seg)
  320. kcp.snd_nxt++
  321. }
  322. // flush data segments
  323. if kcp.snd_buf.Flush() {
  324. kcp.sendingUpdated = false
  325. }
  326. if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  327. seg := &CmdOnlySegment{
  328. Conv: kcp.conv,
  329. Cmd: SegmentCommandPing,
  330. ReceivinNext: kcp.rcv_nxt,
  331. SendingNext: kcp.snd_una,
  332. }
  333. if kcp.state == StateReadyToClose {
  334. seg.Opt = SegmentOptionClose
  335. }
  336. kcp.output.Write(seg)
  337. kcp.lastPingTime = kcp.current
  338. kcp.sendingUpdated = false
  339. kcp.receivingUpdated = false
  340. }
  341. // flash remain segments
  342. kcp.output.Flush()
  343. }
  344. func (kcp *KCP) HandleLost(lost bool) {
  345. if !kcp.congestionControl {
  346. return
  347. }
  348. if lost {
  349. kcp.cwnd = 3 * kcp.cwnd / 4
  350. } else {
  351. kcp.cwnd += kcp.cwnd / 4
  352. }
  353. if kcp.cwnd < 4 {
  354. kcp.cwnd = 4
  355. }
  356. if kcp.cwnd > kcp.snd_wnd {
  357. kcp.cwnd = kcp.snd_wnd
  358. }
  359. }
  360. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  361. // ikcp_check when to call it again (without ikcp_input/_send calling).
  362. // 'current' - current timestamp in millisec.
  363. func (kcp *KCP) Update(current uint32) {
  364. kcp.current = current
  365. kcp.flush()
  366. }
  367. // NoDelay options
  368. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  369. // nodelay: 0:disable(default), 1:enable
  370. // interval: internal update timer interval in millisec, default is 100ms
  371. // resend: 0:disable fast resend(default), 1:enable fast resend
  372. // nc: 0:normal congestion control(default), 1:disable congestion control
  373. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  374. kcp.interval = interval
  375. if resend >= 0 {
  376. kcp.fastresend = int32(resend)
  377. }
  378. kcp.congestionControl = congestionControl
  379. return 0
  380. }
  381. // WaitSnd gets how many packet is waiting to be sent
  382. func (kcp *KCP) WaitSnd() uint32 {
  383. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  384. }
  385. func (this *KCP) ClearSendQueue() {
  386. this.snd_queue.Clear()
  387. this.snd_buf.Clear(0xFFFFFFFF)
  388. }