kcp.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. func _itimediff(later, earlier uint32) int32 {
  12. return (int32)(later - earlier)
  13. }
  14. type State int
  15. const (
  16. StateActive State = 0
  17. StateReadyToClose State = 1
  18. StatePeerClosed State = 2
  19. StateTerminating State = 3
  20. StateTerminated State = 4
  21. )
  22. // KCP defines a single KCP connection
  23. type KCP struct {
  24. conv uint16
  25. state State
  26. stateBeginTime uint32
  27. lastIncomingTime uint32
  28. lastPayloadTime uint32
  29. sendingUpdated bool
  30. receivingUpdated bool
  31. lastPingTime uint32
  32. mss uint32
  33. snd_una, snd_nxt, rcv_nxt uint32
  34. rx_rttvar, rx_srtt, rx_rto uint32
  35. snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32
  36. current, interval uint32
  37. snd_queue *SendingQueue
  38. rcv_queue *ReceivingQueue
  39. snd_buf *SendingWindow
  40. rcv_buf *ReceivingWindow
  41. acklist *ACKList
  42. fastresend int32
  43. congestionControl bool
  44. output *SegmentWriter
  45. }
  46. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  47. // from the same connection.
  48. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  49. log.Debug("KCP|Core: creating KCP ", conv)
  50. kcp := new(KCP)
  51. kcp.conv = conv
  52. kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
  53. kcp.rcv_wnd = effectiveConfig.GetReceivingWindowSize()
  54. kcp.rmt_wnd = 32
  55. kcp.mss = output.Mtu() - DataSegmentOverhead
  56. kcp.rx_rto = 100
  57. kcp.interval = effectiveConfig.Tti
  58. kcp.output = NewSegmentWriter(output)
  59. kcp.rcv_buf = NewReceivingWindow(effectiveConfig.GetReceivingWindowSize())
  60. kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
  61. kcp.rcv_queue = NewReceivingQueue()
  62. kcp.acklist = NewACKList(kcp)
  63. kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
  64. kcp.cwnd = kcp.snd_wnd
  65. return kcp
  66. }
  67. func (kcp *KCP) SetState(state State) {
  68. kcp.state = state
  69. kcp.stateBeginTime = kcp.current
  70. switch state {
  71. case StateReadyToClose:
  72. kcp.rcv_queue.Close()
  73. case StatePeerClosed:
  74. kcp.ClearSendQueue()
  75. case StateTerminating:
  76. kcp.rcv_queue.Close()
  77. case StateTerminated:
  78. kcp.rcv_queue.Close()
  79. }
  80. }
  81. func (kcp *KCP) HandleOption(opt SegmentOption) {
  82. if (opt & SegmentOptionClose) == SegmentOptionClose {
  83. kcp.OnPeerClosed()
  84. }
  85. }
  86. func (kcp *KCP) OnPeerClosed() {
  87. if kcp.state == StateReadyToClose {
  88. kcp.SetState(StateTerminating)
  89. }
  90. if kcp.state == StateActive {
  91. kcp.SetState(StatePeerClosed)
  92. }
  93. }
  94. func (kcp *KCP) OnClose() {
  95. if kcp.state == StateActive {
  96. kcp.SetState(StateReadyToClose)
  97. }
  98. if kcp.state == StatePeerClosed {
  99. kcp.SetState(StateTerminating)
  100. }
  101. }
  102. // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
  103. // @Private
  104. func (kcp *KCP) DumpReceivingBuf() {
  105. for {
  106. seg := kcp.rcv_buf.RemoveFirst()
  107. if seg == nil {
  108. break
  109. }
  110. kcp.rcv_queue.Put(seg.Data)
  111. seg.Data = nil
  112. kcp.rcv_buf.Advance()
  113. kcp.rcv_nxt++
  114. kcp.receivingUpdated = true
  115. }
  116. }
  117. // Send is user/upper level send, returns below zero for error
  118. func (kcp *KCP) Send(buffer []byte) int {
  119. nBytes := 0
  120. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  121. var size int
  122. if len(buffer) > int(kcp.mss) {
  123. size = int(kcp.mss)
  124. } else {
  125. size = len(buffer)
  126. }
  127. seg := &DataSegment{
  128. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  129. }
  130. kcp.snd_queue.Push(seg)
  131. buffer = buffer[size:]
  132. nBytes += size
  133. }
  134. return nBytes
  135. }
  136. // https://tools.ietf.org/html/rfc6298
  137. func (kcp *KCP) update_ack(rtt int32) {
  138. if kcp.rx_srtt == 0 {
  139. kcp.rx_srtt = uint32(rtt)
  140. kcp.rx_rttvar = uint32(rtt) / 2
  141. } else {
  142. delta := rtt - int32(kcp.rx_srtt)
  143. if delta < 0 {
  144. delta = -delta
  145. }
  146. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  147. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  148. if kcp.rx_srtt < kcp.interval {
  149. kcp.rx_srtt = kcp.interval
  150. }
  151. }
  152. var rto uint32
  153. if kcp.interval < 4*kcp.rx_rttvar {
  154. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  155. } else {
  156. rto = kcp.rx_srtt + kcp.interval
  157. }
  158. if rto > 10000 {
  159. rto = 10000
  160. }
  161. kcp.rx_rto = rto * 3 / 2
  162. }
  163. func (kcp *KCP) shrink_buf() {
  164. prevUna := kcp.snd_una
  165. if kcp.snd_buf.Len() > 0 {
  166. seg := kcp.snd_buf.First()
  167. kcp.snd_una = seg.Number
  168. } else {
  169. kcp.snd_una = kcp.snd_nxt
  170. }
  171. if kcp.snd_una != prevUna {
  172. kcp.sendingUpdated = true
  173. }
  174. }
  175. func (kcp *KCP) parse_ack(sn uint32) {
  176. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  177. return
  178. }
  179. kcp.snd_buf.Remove(sn - kcp.snd_una)
  180. }
  181. func (kcp *KCP) parse_fastack(sn uint32) {
  182. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  183. return
  184. }
  185. kcp.snd_buf.HandleFastAck(sn)
  186. }
  187. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  188. kcp.snd_buf.Clear(receivingNext)
  189. }
  190. func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
  191. kcp.acklist.Clear(sendingNext)
  192. }
  193. func (kcp *KCP) parse_data(newseg *DataSegment) {
  194. sn := newseg.Number
  195. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  196. _itimediff(sn, kcp.rcv_nxt) < 0 {
  197. return
  198. }
  199. idx := sn - kcp.rcv_nxt
  200. if !kcp.rcv_buf.Set(idx, newseg) {
  201. newseg.Release()
  202. }
  203. kcp.DumpReceivingBuf()
  204. }
  205. // Input when you received a low level packet (eg. UDP packet), call it
  206. func (kcp *KCP) Input(data []byte) int {
  207. kcp.lastIncomingTime = kcp.current
  208. var seg ISegment
  209. var maxack uint32
  210. var flag int
  211. for {
  212. seg, data = ReadSegment(data)
  213. if seg == nil {
  214. break
  215. }
  216. switch seg := seg.(type) {
  217. case *DataSegment:
  218. kcp.HandleOption(seg.Opt)
  219. kcp.HandleSendingNext(seg.SendingNext)
  220. kcp.acklist.Add(seg.Number, seg.Timestamp)
  221. kcp.receivingUpdated = true
  222. kcp.parse_data(seg)
  223. kcp.lastPayloadTime = kcp.current
  224. case *ACKSegment:
  225. kcp.HandleOption(seg.Opt)
  226. if kcp.rmt_wnd < seg.ReceivingWindow {
  227. kcp.rmt_wnd = seg.ReceivingWindow
  228. }
  229. kcp.HandleReceivingNext(seg.ReceivingNext)
  230. for i := 0; i < int(seg.Count); i++ {
  231. ts := seg.TimestampList[i]
  232. sn := seg.NumberList[i]
  233. if _itimediff(kcp.current, ts) >= 0 {
  234. kcp.update_ack(_itimediff(kcp.current, ts))
  235. }
  236. kcp.parse_ack(sn)
  237. kcp.shrink_buf()
  238. if flag == 0 {
  239. flag = 1
  240. maxack = sn
  241. } else if _itimediff(sn, maxack) > 0 {
  242. maxack = sn
  243. }
  244. }
  245. kcp.lastPayloadTime = kcp.current
  246. case *CmdOnlySegment:
  247. kcp.HandleOption(seg.Opt)
  248. if seg.Cmd == SegmentCommandTerminated {
  249. if kcp.state == StateActive ||
  250. kcp.state == StateReadyToClose ||
  251. kcp.state == StatePeerClosed {
  252. kcp.SetState(StateTerminating)
  253. } else if kcp.state == StateTerminating {
  254. kcp.SetState(StateTerminated)
  255. }
  256. }
  257. kcp.HandleReceivingNext(seg.ReceivinNext)
  258. kcp.HandleSendingNext(seg.SendingNext)
  259. kcp.shrink_buf()
  260. default:
  261. }
  262. }
  263. if flag != 0 {
  264. kcp.parse_fastack(maxack)
  265. }
  266. return 0
  267. }
  268. // flush pending data
  269. func (kcp *KCP) flush() {
  270. if kcp.state == StateTerminated {
  271. return
  272. }
  273. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  274. kcp.OnClose()
  275. }
  276. if kcp.state == StateTerminating {
  277. kcp.output.Write(&CmdOnlySegment{
  278. Conv: kcp.conv,
  279. Cmd: SegmentCommandTerminated,
  280. })
  281. kcp.output.Flush()
  282. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  283. kcp.SetState(StateTerminated)
  284. }
  285. return
  286. }
  287. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  288. kcp.SetState(StateTerminating)
  289. }
  290. current := kcp.current
  291. // flush acknowledges
  292. if kcp.acklist.Flush() {
  293. kcp.receivingUpdated = false
  294. }
  295. // calculate window size
  296. cwnd := kcp.snd_una + kcp.snd_wnd
  297. if cwnd < kcp.rmt_wnd {
  298. cwnd = kcp.rmt_wnd
  299. }
  300. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  301. cwnd = kcp.snd_una + kcp.cwnd
  302. }
  303. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  304. seg := kcp.snd_queue.Pop()
  305. seg.Conv = kcp.conv
  306. seg.Number = kcp.snd_nxt
  307. seg.timeout = current
  308. seg.ackSkipped = 0
  309. seg.transmit = 0
  310. kcp.snd_buf.Push(seg)
  311. kcp.snd_nxt++
  312. }
  313. // flush data segments
  314. if kcp.snd_buf.Flush() {
  315. kcp.sendingUpdated = false
  316. }
  317. if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  318. seg := &CmdOnlySegment{
  319. Conv: kcp.conv,
  320. Cmd: SegmentCommandPing,
  321. ReceivinNext: kcp.rcv_nxt,
  322. SendingNext: kcp.snd_una,
  323. }
  324. if kcp.state == StateReadyToClose {
  325. seg.Opt = SegmentOptionClose
  326. }
  327. kcp.output.Write(seg)
  328. kcp.lastPingTime = kcp.current
  329. kcp.sendingUpdated = false
  330. kcp.receivingUpdated = false
  331. }
  332. // flash remain segments
  333. kcp.output.Flush()
  334. }
  335. func (kcp *KCP) HandleLost(lost bool) {
  336. if !kcp.congestionControl {
  337. return
  338. }
  339. if lost {
  340. kcp.cwnd = 3 * kcp.cwnd / 4
  341. } else {
  342. kcp.cwnd += kcp.cwnd / 4
  343. }
  344. if kcp.cwnd < 4 {
  345. kcp.cwnd = 4
  346. }
  347. if kcp.cwnd > kcp.snd_wnd {
  348. kcp.cwnd = kcp.snd_wnd
  349. }
  350. }
  351. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  352. // ikcp_check when to call it again (without ikcp_input/_send calling).
  353. // 'current' - current timestamp in millisec.
  354. func (kcp *KCP) Update(current uint32) {
  355. kcp.current = current
  356. kcp.flush()
  357. }
  358. // NoDelay options
  359. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  360. // nodelay: 0:disable(default), 1:enable
  361. // interval: internal update timer interval in millisec, default is 100ms
  362. // resend: 0:disable fast resend(default), 1:enable fast resend
  363. // nc: 0:normal congestion control(default), 1:disable congestion control
  364. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  365. kcp.interval = interval
  366. if resend >= 0 {
  367. kcp.fastresend = int32(resend)
  368. }
  369. kcp.congestionControl = congestionControl
  370. return 0
  371. }
  372. // WaitSnd gets how many packet is waiting to be sent
  373. func (kcp *KCP) WaitSnd() uint32 {
  374. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  375. }
  376. func (this *KCP) ClearSendQueue() {
  377. this.snd_queue.Clear()
  378. this.snd_buf.Clear(0xFFFFFFFF)
  379. }