kcp.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. func _itimediff(later, earlier uint32) int32 {
  12. return (int32)(later - earlier)
  13. }
  14. type State int
  15. const (
  16. StateActive State = 0
  17. StateReadyToClose State = 1
  18. StatePeerClosed State = 2
  19. StateTerminating State = 3
  20. StateTerminated State = 4
  21. )
  22. // KCP defines a single KCP connection
  23. type KCP struct {
  24. conv uint16
  25. state State
  26. stateBeginTime uint32
  27. lastIncomingTime uint32
  28. lastPayloadTime uint32
  29. sendingUpdated bool
  30. lastPingTime uint32
  31. mss uint32
  32. snd_una, snd_nxt uint32
  33. rx_rttvar, rx_srtt, rx_rto uint32
  34. snd_wnd, rmt_wnd, cwnd uint32
  35. current, interval uint32
  36. snd_queue *SendingQueue
  37. snd_buf *SendingWindow
  38. receivingWorker *ReceivingWorker
  39. fastresend int32
  40. congestionControl bool
  41. output *BufferedSegmentWriter
  42. }
  43. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  44. // from the same connection.
  45. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  46. log.Debug("KCP|Core: creating KCP ", conv)
  47. kcp := new(KCP)
  48. kcp.conv = conv
  49. kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
  50. kcp.rmt_wnd = 32
  51. kcp.mss = output.Mtu() - DataSegmentOverhead
  52. kcp.rx_rto = 100
  53. kcp.interval = effectiveConfig.Tti
  54. kcp.output = NewSegmentWriter(output)
  55. kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
  56. kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
  57. kcp.cwnd = kcp.snd_wnd
  58. kcp.receivingWorker = NewReceivingWorker(kcp)
  59. return kcp
  60. }
  61. func (kcp *KCP) SetState(state State) {
  62. kcp.state = state
  63. kcp.stateBeginTime = kcp.current
  64. switch state {
  65. case StateReadyToClose:
  66. kcp.receivingWorker.CloseRead()
  67. case StatePeerClosed:
  68. kcp.ClearSendQueue()
  69. case StateTerminating:
  70. kcp.receivingWorker.CloseRead()
  71. case StateTerminated:
  72. kcp.receivingWorker.CloseRead()
  73. }
  74. }
  75. func (kcp *KCP) HandleOption(opt SegmentOption) {
  76. if (opt & SegmentOptionClose) == SegmentOptionClose {
  77. kcp.OnPeerClosed()
  78. }
  79. }
  80. func (kcp *KCP) OnPeerClosed() {
  81. if kcp.state == StateReadyToClose {
  82. kcp.SetState(StateTerminating)
  83. }
  84. if kcp.state == StateActive {
  85. kcp.SetState(StatePeerClosed)
  86. }
  87. }
  88. func (kcp *KCP) OnClose() {
  89. if kcp.state == StateActive {
  90. kcp.SetState(StateReadyToClose)
  91. }
  92. if kcp.state == StatePeerClosed {
  93. kcp.SetState(StateTerminating)
  94. }
  95. }
  96. // Send is user/upper level send, returns below zero for error
  97. func (kcp *KCP) Send(buffer []byte) int {
  98. nBytes := 0
  99. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  100. var size int
  101. if len(buffer) > int(kcp.mss) {
  102. size = int(kcp.mss)
  103. } else {
  104. size = len(buffer)
  105. }
  106. seg := &DataSegment{
  107. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  108. }
  109. kcp.snd_queue.Push(seg)
  110. buffer = buffer[size:]
  111. nBytes += size
  112. }
  113. return nBytes
  114. }
  115. // https://tools.ietf.org/html/rfc6298
  116. func (kcp *KCP) update_ack(rtt int32) {
  117. if kcp.rx_srtt == 0 {
  118. kcp.rx_srtt = uint32(rtt)
  119. kcp.rx_rttvar = uint32(rtt) / 2
  120. } else {
  121. delta := rtt - int32(kcp.rx_srtt)
  122. if delta < 0 {
  123. delta = -delta
  124. }
  125. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  126. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  127. if kcp.rx_srtt < kcp.interval {
  128. kcp.rx_srtt = kcp.interval
  129. }
  130. }
  131. var rto uint32
  132. if kcp.interval < 4*kcp.rx_rttvar {
  133. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  134. } else {
  135. rto = kcp.rx_srtt + kcp.interval
  136. }
  137. if rto > 10000 {
  138. rto = 10000
  139. }
  140. kcp.rx_rto = rto * 3 / 2
  141. }
  142. func (kcp *KCP) shrink_buf() {
  143. prevUna := kcp.snd_una
  144. if kcp.snd_buf.Len() > 0 {
  145. seg := kcp.snd_buf.First()
  146. kcp.snd_una = seg.Number
  147. } else {
  148. kcp.snd_una = kcp.snd_nxt
  149. }
  150. if kcp.snd_una != prevUna {
  151. kcp.sendingUpdated = true
  152. }
  153. }
  154. func (kcp *KCP) parse_ack(sn uint32) {
  155. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  156. return
  157. }
  158. kcp.snd_buf.Remove(sn - kcp.snd_una)
  159. }
  160. func (kcp *KCP) parse_fastack(sn uint32) {
  161. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  162. return
  163. }
  164. kcp.snd_buf.HandleFastAck(sn)
  165. }
  166. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  167. kcp.snd_buf.Clear(receivingNext)
  168. }
  169. // Input when you received a low level packet (eg. UDP packet), call it
  170. func (kcp *KCP) Input(data []byte) int {
  171. kcp.lastIncomingTime = kcp.current
  172. var seg ISegment
  173. var maxack uint32
  174. var flag int
  175. for {
  176. seg, data = ReadSegment(data)
  177. if seg == nil {
  178. break
  179. }
  180. switch seg := seg.(type) {
  181. case *DataSegment:
  182. kcp.HandleOption(seg.Opt)
  183. kcp.receivingWorker.ProcessSegment(seg)
  184. kcp.lastPayloadTime = kcp.current
  185. case *AckSegment:
  186. kcp.HandleOption(seg.Opt)
  187. if kcp.rmt_wnd < seg.ReceivingWindow {
  188. kcp.rmt_wnd = seg.ReceivingWindow
  189. }
  190. kcp.HandleReceivingNext(seg.ReceivingNext)
  191. for i := 0; i < int(seg.Count); i++ {
  192. ts := seg.TimestampList[i]
  193. sn := seg.NumberList[i]
  194. if _itimediff(kcp.current, ts) >= 0 {
  195. kcp.update_ack(_itimediff(kcp.current, ts))
  196. }
  197. kcp.parse_ack(sn)
  198. kcp.shrink_buf()
  199. if flag == 0 {
  200. flag = 1
  201. maxack = sn
  202. } else if _itimediff(sn, maxack) > 0 {
  203. maxack = sn
  204. }
  205. }
  206. kcp.lastPayloadTime = kcp.current
  207. case *CmdOnlySegment:
  208. kcp.HandleOption(seg.Opt)
  209. if seg.Cmd == SegmentCommandTerminated {
  210. if kcp.state == StateActive ||
  211. kcp.state == StateReadyToClose ||
  212. kcp.state == StatePeerClosed {
  213. kcp.SetState(StateTerminating)
  214. } else if kcp.state == StateTerminating {
  215. kcp.SetState(StateTerminated)
  216. }
  217. }
  218. kcp.HandleReceivingNext(seg.ReceivinNext)
  219. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  220. kcp.shrink_buf()
  221. default:
  222. }
  223. }
  224. if flag != 0 {
  225. kcp.parse_fastack(maxack)
  226. }
  227. return 0
  228. }
  229. // flush pending data
  230. func (kcp *KCP) flush() {
  231. if kcp.state == StateTerminated {
  232. return
  233. }
  234. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  235. kcp.OnClose()
  236. }
  237. if kcp.state == StateTerminating {
  238. kcp.output.Write(&CmdOnlySegment{
  239. Conv: kcp.conv,
  240. Cmd: SegmentCommandTerminated,
  241. })
  242. kcp.output.Flush()
  243. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  244. kcp.SetState(StateTerminated)
  245. }
  246. return
  247. }
  248. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  249. kcp.SetState(StateTerminating)
  250. }
  251. current := kcp.current
  252. // flush acknowledges
  253. kcp.receivingWorker.Flush()
  254. // calculate window size
  255. cwnd := kcp.snd_una + kcp.snd_wnd
  256. if cwnd < kcp.rmt_wnd {
  257. cwnd = kcp.rmt_wnd
  258. }
  259. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  260. cwnd = kcp.snd_una + kcp.cwnd
  261. }
  262. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  263. seg := kcp.snd_queue.Pop()
  264. seg.Conv = kcp.conv
  265. seg.Number = kcp.snd_nxt
  266. seg.timeout = current
  267. seg.ackSkipped = 0
  268. seg.transmit = 0
  269. kcp.snd_buf.Push(seg)
  270. kcp.snd_nxt++
  271. }
  272. // flush data segments
  273. if kcp.snd_buf.Flush() {
  274. kcp.sendingUpdated = false
  275. }
  276. if kcp.sendingUpdated || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  277. seg := &CmdOnlySegment{
  278. Conv: kcp.conv,
  279. Cmd: SegmentCommandPing,
  280. ReceivinNext: kcp.receivingWorker.nextNumber,
  281. SendingNext: kcp.snd_una,
  282. }
  283. if kcp.state == StateReadyToClose {
  284. seg.Opt = SegmentOptionClose
  285. }
  286. kcp.output.Write(seg)
  287. kcp.lastPingTime = kcp.current
  288. kcp.sendingUpdated = false
  289. }
  290. // flash remain segments
  291. kcp.output.Flush()
  292. }
  293. func (kcp *KCP) HandleLost(lost bool) {
  294. if !kcp.congestionControl {
  295. return
  296. }
  297. if lost {
  298. kcp.cwnd = 3 * kcp.cwnd / 4
  299. } else {
  300. kcp.cwnd += kcp.cwnd / 4
  301. }
  302. if kcp.cwnd < 4 {
  303. kcp.cwnd = 4
  304. }
  305. if kcp.cwnd > kcp.snd_wnd {
  306. kcp.cwnd = kcp.snd_wnd
  307. }
  308. }
  309. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  310. // ikcp_check when to call it again (without ikcp_input/_send calling).
  311. // 'current' - current timestamp in millisec.
  312. func (kcp *KCP) Update(current uint32) {
  313. kcp.current = current
  314. kcp.flush()
  315. }
  316. // NoDelay options
  317. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  318. // nodelay: 0:disable(default), 1:enable
  319. // interval: internal update timer interval in millisec, default is 100ms
  320. // resend: 0:disable fast resend(default), 1:enable fast resend
  321. // nc: 0:normal congestion control(default), 1:disable congestion control
  322. func (kcp *KCP) NoDelay(interval uint32, resend int, congestionControl bool) int {
  323. kcp.interval = interval
  324. if resend >= 0 {
  325. kcp.fastresend = int32(resend)
  326. }
  327. kcp.congestionControl = congestionControl
  328. return 0
  329. }
  330. // WaitSnd gets how many packet is waiting to be sent
  331. func (kcp *KCP) WaitSnd() uint32 {
  332. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  333. }
  334. func (this *KCP) ClearSendQueue() {
  335. this.snd_queue.Clear()
  336. this.snd_buf.Clear(0xFFFFFFFF)
  337. }