kcp.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. v2io "github.com/v2ray/v2ray-core/common/io"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. )
  12. const (
  13. IKCP_RTO_NDL = 30 // no delay min rto
  14. IKCP_RTO_MIN = 100 // normal min rto
  15. IKCP_RTO_DEF = 200
  16. IKCP_RTO_MAX = 60000
  17. IKCP_WND_SND = 32
  18. IKCP_WND_RCV = 32
  19. IKCP_INTERVAL = 100
  20. )
  21. func _itimediff(later, earlier uint32) int32 {
  22. return (int32)(later - earlier)
  23. }
  24. type State int
  25. const (
  26. StateActive State = 0
  27. StateReadyToClose State = 1
  28. StatePeerClosed State = 2
  29. StateTerminating State = 3
  30. StateTerminated State = 4
  31. )
  32. // KCP defines a single KCP connection
  33. type KCP struct {
  34. conv uint16
  35. state State
  36. stateBeginTime uint32
  37. lastIncomingTime uint32
  38. lastPayloadTime uint32
  39. sendingUpdated bool
  40. receivingUpdated bool
  41. lastPingTime uint32
  42. mtu, mss uint32
  43. snd_una, snd_nxt, rcv_nxt uint32
  44. rx_rttvar, rx_srtt, rx_rto uint32
  45. snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
  46. current, interval uint32
  47. snd_queue *SendingQueue
  48. rcv_queue *ReceivingQueue
  49. snd_buf *SendingWindow
  50. rcv_buf *ReceivingWindow
  51. acklist *ACKList
  52. fastresend int32
  53. congestionControl bool
  54. output *SegmentWriter
  55. }
  56. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  57. // from the same connection.
  58. func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSize uint32, sendingQueueSize uint32, output v2io.Writer) *KCP {
  59. log.Debug("KCP|Core: creating KCP ", conv)
  60. kcp := new(KCP)
  61. kcp.conv = conv
  62. kcp.snd_wnd = sendingWindowSize
  63. kcp.rcv_wnd = receivingWindowSize
  64. kcp.rmt_wnd = IKCP_WND_RCV
  65. kcp.mtu = mtu
  66. kcp.mss = kcp.mtu - DataSegmentOverhead
  67. kcp.rx_rto = IKCP_RTO_DEF
  68. kcp.interval = IKCP_INTERVAL
  69. kcp.output = NewSegmentWriter(mtu, output)
  70. kcp.rcv_buf = NewReceivingWindow(receivingWindowSize)
  71. kcp.snd_queue = NewSendingQueue(sendingQueueSize)
  72. kcp.rcv_queue = NewReceivingQueue()
  73. kcp.acklist = NewACKList(kcp)
  74. kcp.snd_buf = NewSendingWindow(kcp, sendingWindowSize)
  75. kcp.cwnd = kcp.snd_wnd
  76. return kcp
  77. }
  78. func (kcp *KCP) SetState(state State) {
  79. kcp.state = state
  80. kcp.stateBeginTime = kcp.current
  81. switch state {
  82. case StateReadyToClose:
  83. kcp.rcv_queue.Close()
  84. case StatePeerClosed:
  85. kcp.ClearSendQueue()
  86. case StateTerminating:
  87. kcp.rcv_queue.Close()
  88. case StateTerminated:
  89. kcp.rcv_queue.Close()
  90. }
  91. }
  92. func (kcp *KCP) HandleOption(opt SegmentOption) {
  93. if (opt & SegmentOptionClose) == SegmentOptionClose {
  94. kcp.OnPeerClosed()
  95. }
  96. }
  97. func (kcp *KCP) OnPeerClosed() {
  98. if kcp.state == StateReadyToClose {
  99. kcp.SetState(StateTerminating)
  100. }
  101. if kcp.state == StateActive {
  102. kcp.SetState(StatePeerClosed)
  103. }
  104. }
  105. func (kcp *KCP) OnClose() {
  106. if kcp.state == StateActive {
  107. kcp.SetState(StateReadyToClose)
  108. }
  109. if kcp.state == StatePeerClosed {
  110. kcp.SetState(StateTerminating)
  111. }
  112. }
  113. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  114. // @Private
  115. func (kcp *KCP) DumpReceivingBuf() {
  116. for {
  117. seg := kcp.rcv_buf.RemoveFirst()
  118. if seg == nil {
  119. break
  120. }
  121. kcp.rcv_queue.Put(seg.Data)
  122. seg.Data = nil
  123. kcp.rcv_buf.Advance()
  124. kcp.rcv_nxt++
  125. kcp.receivingUpdated = true
  126. }
  127. }
  128. // Send is user/upper level send, returns below zero for error
  129. func (kcp *KCP) Send(buffer []byte) int {
  130. nBytes := 0
  131. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  132. var size int
  133. if len(buffer) > int(kcp.mss) {
  134. size = int(kcp.mss)
  135. } else {
  136. size = len(buffer)
  137. }
  138. seg := &DataSegment{
  139. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  140. }
  141. kcp.snd_queue.Push(seg)
  142. buffer = buffer[size:]
  143. nBytes += size
  144. }
  145. return nBytes
  146. }
  147. // https://tools.ietf.org/html/rfc6298
  148. func (kcp *KCP) update_ack(rtt int32) {
  149. if kcp.rx_srtt == 0 {
  150. kcp.rx_srtt = uint32(rtt)
  151. kcp.rx_rttvar = uint32(rtt) / 2
  152. } else {
  153. delta := rtt - int32(kcp.rx_srtt)
  154. if delta < 0 {
  155. delta = -delta
  156. }
  157. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  158. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  159. if kcp.rx_srtt < kcp.interval {
  160. kcp.rx_srtt = kcp.interval
  161. }
  162. }
  163. var rto uint32
  164. if kcp.interval < 4*kcp.rx_rttvar {
  165. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  166. } else {
  167. rto = kcp.rx_srtt + kcp.interval
  168. }
  169. if rto > IKCP_RTO_MAX {
  170. rto = IKCP_RTO_MAX
  171. }
  172. kcp.rx_rto = rto * 3 / 2
  173. }
  174. func (kcp *KCP) shrink_buf() {
  175. prevUna := kcp.snd_una
  176. if kcp.snd_buf.Len() > 0 {
  177. seg := kcp.snd_buf.First()
  178. kcp.snd_una = seg.Number
  179. } else {
  180. kcp.snd_una = kcp.snd_nxt
  181. }
  182. if kcp.snd_una != prevUna {
  183. kcp.sendingUpdated = true
  184. }
  185. }
  186. func (kcp *KCP) parse_ack(sn uint32) {
  187. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  188. return
  189. }
  190. kcp.snd_buf.Remove(sn - kcp.snd_una)
  191. }
  192. func (kcp *KCP) parse_fastack(sn uint32) {
  193. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  194. return
  195. }
  196. kcp.snd_buf.HandleFastAck(sn)
  197. }
  198. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  199. kcp.snd_buf.Clear(receivingNext)
  200. }
  201. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  202. kcp.acklist.Clear(sendingNext)
  203. }
  204. func (kcp *KCP) parse_data(newseg *DataSegment) {
  205. sn := newseg.Number
  206. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  207. _itimediff(sn, kcp.rcv_nxt) < 0 {
  208. return
  209. }
  210. idx := sn - kcp.rcv_nxt
  211. if !kcp.rcv_buf.Set(idx, newseg) {
  212. newseg.Release()
  213. }
  214. kcp.DumpReceivingBuf()
  215. }
  216. // Input when you received a low level packet (eg. UDP packet), call it
  217. func (kcp *KCP) Input(data []byte) int {
  218. kcp.lastIncomingTime = kcp.current
  219. var seg ISegment
  220. var maxack uint32
  221. var flag int
  222. for {
  223. seg, data = ReadSegment(data)
  224. if seg == nil {
  225. break
  226. }
  227. switch seg := seg.(type) {
  228. case *DataSegment:
  229. kcp.HandleOption(seg.Opt)
  230. kcp.HandleSendingNext(seg.SendingNext)
  231. kcp.acklist.Add(seg.Number, seg.Timestamp)
  232. kcp.receivingUpdated = true
  233. kcp.parse_data(seg)
  234. kcp.lastPayloadTime = kcp.current
  235. case *ACKSegment:
  236. kcp.HandleOption(seg.Opt)
  237. if kcp.rmt_wnd < seg.ReceivingWindow {
  238. kcp.rmt_wnd = seg.ReceivingWindow
  239. }
  240. kcp.HandleReceivingNext(seg.ReceivingNext)
  241. for i := 0; i < int(seg.Count); i++ {
  242. ts := seg.TimestampList[i]
  243. sn := seg.NumberList[i]
  244. if _itimediff(kcp.current, ts) >= 0 {
  245. kcp.update_ack(_itimediff(kcp.current, ts))
  246. }
  247. kcp.parse_ack(sn)
  248. kcp.shrink_buf()
  249. if flag == 0 {
  250. flag = 1
  251. maxack = sn
  252. } else if _itimediff(sn, maxack) > 0 {
  253. maxack = sn
  254. }
  255. }
  256. kcp.lastPayloadTime = kcp.current
  257. case *CmdOnlySegment:
  258. kcp.HandleOption(seg.Opt)
  259. if seg.Cmd == SegmentCommandTerminated {
  260. if kcp.state == StateActive ||
  261. kcp.state == StateReadyToClose ||
  262. kcp.state == StatePeerClosed {
  263. kcp.SetState(StateTerminating)
  264. } else if kcp.state == StateTerminating {
  265. kcp.SetState(StateTerminated)
  266. }
  267. }
  268. kcp.HandleReceivingNext(seg.ReceivinNext)
  269. kcp.HandleSendingNext(seg.SendingNext)
  270. kcp.shrink_buf()
  271. default:
  272. }
  273. }
  274. if flag != 0 {
  275. kcp.parse_fastack(maxack)
  276. }
  277. return 0
  278. }
  279. // flush pending data
  280. func (kcp *KCP) flush() {
  281. if kcp.state == StateTerminated {
  282. return
  283. }
  284. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  285. kcp.OnClose()
  286. }
  287. if kcp.state == StateTerminating {
  288. kcp.output.Write(&CmdOnlySegment{
  289. Conv: kcp.conv,
  290. Cmd: SegmentCommandTerminated,
  291. })
  292. kcp.output.Flush()
  293. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  294. kcp.SetState(StateTerminated)
  295. }
  296. return
  297. }
  298. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  299. kcp.SetState(StateTerminating)
  300. }
  301. current := kcp.current
  302. // flush acknowledges
  303. if kcp.acklist.Flush() {
  304. kcp.receivingUpdated = false
  305. }
  306. // calculate window size
  307. cwnd := kcp.snd_una + kcp.snd_wnd
  308. if cwnd < kcp.rmt_wnd {
  309. cwnd = kcp.rmt_wnd
  310. }
  311. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  312. cwnd = kcp.snd_una + kcp.cwnd
  313. }
  314. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  315. seg := kcp.snd_queue.Pop()
  316. seg.Conv = kcp.conv
  317. seg.Number = kcp.snd_nxt
  318. seg.timeout = current
  319. seg.ackSkipped = 0
  320. seg.transmit = 0
  321. kcp.snd_buf.Push(seg)
  322. kcp.snd_nxt++
  323. }
  324. // flush data segments
  325. if kcp.snd_buf.Flush() {
  326. kcp.sendingUpdated = false
  327. }
  328. if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  329. seg := &CmdOnlySegment{
  330. Conv: kcp.conv,
  331. Cmd: SegmentCommandPing,
  332. ReceivinNext: kcp.rcv_nxt,
  333. SendingNext: kcp.snd_una,
  334. }
  335. if kcp.state == StateReadyToClose {
  336. seg.Opt = SegmentOptionClose
  337. }
  338. kcp.output.Write(seg)
  339. kcp.lastPingTime = kcp.current
  340. kcp.sendingUpdated = false
  341. kcp.receivingUpdated = false
  342. }
  343. // flash remain segments
  344. kcp.output.Flush()
  345. }
  346. func (kcp *KCP) HandleLost(lost bool) {
  347. if !kcp.congestionControl {
  348. return
  349. }
  350. if lost {
  351. kcp.cwnd = 3 * kcp.cwnd / 4
  352. } else {
  353. kcp.cwnd += kcp.cwnd / 4
  354. }
  355. if kcp.cwnd < 4 {
  356. kcp.cwnd = 4
  357. }
  358. if kcp.cwnd > kcp.snd_wnd {
  359. kcp.cwnd = kcp.snd_wnd
  360. }
  361. }
  362. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  363. // ikcp_check when to call it again (without ikcp_input/_send calling).
  364. // 'current' - current timestamp in millisec.
  365. func (kcp *KCP) Update(current uint32) {
  366. kcp.current = current
  367. kcp.flush()
  368. }
  369. // NoDelay options
  370. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  371. // nodelay: 0:disable(default), 1:enable
  372. // interval: internal update timer interval in millisec, default is 100ms
  373. // resend: 0:disable fast resend(default), 1:enable fast resend
  374. // nc: 0:normal congestion control(default), 1:disable congestion control
  375. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  376. kcp.interval = interval
  377. if resend >= 0 {
  378. kcp.fastresend = int32(resend)
  379. }
  380. kcp.congestionControl = congestionControl
  381. return 0
  382. }
  383. // WaitSnd gets how many packet is waiting to be sent
  384. func (kcp *KCP) WaitSnd() uint32 {
  385. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  386. }
  387. func (this *KCP) ClearSendQueue() {
  388. this.snd_queue.Clear()
  389. this.snd_buf.Clear(0xFFFFFFFF)
  390. }