kcp.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. func _itimediff(later, earlier uint32) int32 {
  12. return (int32)(later - earlier)
  13. }
  14. type State int
  15. const (
  16. StateActive State = 0
  17. StateReadyToClose State = 1
  18. StatePeerClosed State = 2
  19. StateTerminating State = 3
  20. StateTerminated State = 4
  21. )
  22. // KCP defines a single KCP connection
  23. type KCP struct {
  24. conv uint16
  25. state State
  26. stateBeginTime uint32
  27. lastIncomingTime uint32
  28. lastPayloadTime uint32
  29. sendingUpdated bool
  30. lastPingTime uint32
  31. mss uint32
  32. snd_una, snd_nxt uint32
  33. rx_rttvar, rx_srtt, rx_rto uint32
  34. snd_wnd, rmt_wnd, cwnd uint32
  35. current, interval uint32
  36. snd_queue *SendingQueue
  37. snd_buf *SendingWindow
  38. receivingWorker *ReceivingWorker
  39. fastresend uint32
  40. congestionControl bool
  41. output *BufferedSegmentWriter
  42. }
  43. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  44. // from the same connection.
  45. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  46. log.Debug("KCP|Core: creating KCP ", conv)
  47. kcp := new(KCP)
  48. kcp.conv = conv
  49. kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
  50. kcp.rmt_wnd = 32
  51. kcp.mss = output.Mtu() - DataSegmentOverhead
  52. kcp.rx_rto = 100
  53. kcp.interval = effectiveConfig.Tti
  54. kcp.output = NewSegmentWriter(output)
  55. kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
  56. kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
  57. kcp.cwnd = kcp.snd_wnd
  58. kcp.receivingWorker = NewReceivingWorker(kcp)
  59. kcp.fastresend = 2
  60. kcp.congestionControl = effectiveConfig.Congestion
  61. return kcp
  62. }
  63. func (kcp *KCP) SetState(state State) {
  64. kcp.state = state
  65. kcp.stateBeginTime = kcp.current
  66. switch state {
  67. case StateReadyToClose:
  68. kcp.receivingWorker.CloseRead()
  69. case StatePeerClosed:
  70. kcp.ClearSendQueue()
  71. case StateTerminating:
  72. kcp.receivingWorker.CloseRead()
  73. case StateTerminated:
  74. kcp.receivingWorker.CloseRead()
  75. }
  76. }
  77. func (kcp *KCP) HandleOption(opt SegmentOption) {
  78. if (opt & SegmentOptionClose) == SegmentOptionClose {
  79. kcp.OnPeerClosed()
  80. }
  81. }
  82. func (kcp *KCP) OnPeerClosed() {
  83. if kcp.state == StateReadyToClose {
  84. kcp.SetState(StateTerminating)
  85. }
  86. if kcp.state == StateActive {
  87. kcp.SetState(StatePeerClosed)
  88. }
  89. }
  90. func (kcp *KCP) OnClose() {
  91. if kcp.state == StateActive {
  92. kcp.SetState(StateReadyToClose)
  93. }
  94. if kcp.state == StatePeerClosed {
  95. kcp.SetState(StateTerminating)
  96. }
  97. }
  98. // Send is user/upper level send, returns below zero for error
  99. func (kcp *KCP) Send(buffer []byte) int {
  100. nBytes := 0
  101. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  102. var size int
  103. if len(buffer) > int(kcp.mss) {
  104. size = int(kcp.mss)
  105. } else {
  106. size = len(buffer)
  107. }
  108. seg := &DataSegment{
  109. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  110. }
  111. kcp.snd_queue.Push(seg)
  112. buffer = buffer[size:]
  113. nBytes += size
  114. }
  115. return nBytes
  116. }
  117. // https://tools.ietf.org/html/rfc6298
  118. func (kcp *KCP) update_ack(rtt int32) {
  119. if kcp.rx_srtt == 0 {
  120. kcp.rx_srtt = uint32(rtt)
  121. kcp.rx_rttvar = uint32(rtt) / 2
  122. } else {
  123. delta := rtt - int32(kcp.rx_srtt)
  124. if delta < 0 {
  125. delta = -delta
  126. }
  127. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  128. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  129. if kcp.rx_srtt < kcp.interval {
  130. kcp.rx_srtt = kcp.interval
  131. }
  132. }
  133. var rto uint32
  134. if kcp.interval < 4*kcp.rx_rttvar {
  135. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  136. } else {
  137. rto = kcp.rx_srtt + kcp.interval
  138. }
  139. if rto > 10000 {
  140. rto = 10000
  141. }
  142. kcp.rx_rto = rto * 3 / 2
  143. }
  144. func (kcp *KCP) shrink_buf() {
  145. prevUna := kcp.snd_una
  146. if kcp.snd_buf.Len() > 0 {
  147. seg := kcp.snd_buf.First()
  148. kcp.snd_una = seg.Number
  149. } else {
  150. kcp.snd_una = kcp.snd_nxt
  151. }
  152. if kcp.snd_una != prevUna {
  153. kcp.sendingUpdated = true
  154. }
  155. }
  156. func (kcp *KCP) parse_ack(sn uint32) {
  157. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  158. return
  159. }
  160. kcp.snd_buf.Remove(sn - kcp.snd_una)
  161. }
  162. func (kcp *KCP) parse_fastack(sn uint32) {
  163. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  164. return
  165. }
  166. kcp.snd_buf.HandleFastAck(sn)
  167. }
  168. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  169. kcp.snd_buf.Clear(receivingNext)
  170. }
  171. // Input when you received a low level packet (eg. UDP packet), call it
  172. func (kcp *KCP) Input(data []byte) int {
  173. kcp.lastIncomingTime = kcp.current
  174. var seg ISegment
  175. var maxack uint32
  176. var flag int
  177. for {
  178. seg, data = ReadSegment(data)
  179. if seg == nil {
  180. break
  181. }
  182. switch seg := seg.(type) {
  183. case *DataSegment:
  184. kcp.HandleOption(seg.Opt)
  185. kcp.receivingWorker.ProcessSegment(seg)
  186. kcp.lastPayloadTime = kcp.current
  187. case *AckSegment:
  188. kcp.HandleOption(seg.Opt)
  189. if kcp.rmt_wnd < seg.ReceivingWindow {
  190. kcp.rmt_wnd = seg.ReceivingWindow
  191. }
  192. kcp.HandleReceivingNext(seg.ReceivingNext)
  193. kcp.shrink_buf()
  194. for i := 0; i < int(seg.Count); i++ {
  195. ts := seg.TimestampList[i]
  196. sn := seg.NumberList[i]
  197. if _itimediff(kcp.current, ts) >= 0 {
  198. kcp.update_ack(_itimediff(kcp.current, ts))
  199. }
  200. kcp.parse_ack(sn)
  201. kcp.shrink_buf()
  202. if flag == 0 {
  203. flag = 1
  204. maxack = sn
  205. } else if _itimediff(sn, maxack) > 0 {
  206. maxack = sn
  207. }
  208. }
  209. kcp.lastPayloadTime = kcp.current
  210. case *CmdOnlySegment:
  211. kcp.HandleOption(seg.Opt)
  212. if seg.Cmd == SegmentCommandTerminated {
  213. if kcp.state == StateActive ||
  214. kcp.state == StateReadyToClose ||
  215. kcp.state == StatePeerClosed {
  216. kcp.SetState(StateTerminating)
  217. } else if kcp.state == StateTerminating {
  218. kcp.SetState(StateTerminated)
  219. }
  220. }
  221. kcp.HandleReceivingNext(seg.ReceivinNext)
  222. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  223. kcp.shrink_buf()
  224. default:
  225. }
  226. }
  227. if flag != 0 {
  228. kcp.parse_fastack(maxack)
  229. }
  230. return 0
  231. }
  232. // flush pending data
  233. func (kcp *KCP) flush() {
  234. if kcp.state == StateTerminated {
  235. return
  236. }
  237. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  238. kcp.OnClose()
  239. }
  240. if kcp.state == StateTerminating {
  241. kcp.output.Write(&CmdOnlySegment{
  242. Conv: kcp.conv,
  243. Cmd: SegmentCommandTerminated,
  244. })
  245. kcp.output.Flush()
  246. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  247. kcp.SetState(StateTerminated)
  248. }
  249. return
  250. }
  251. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  252. kcp.SetState(StateTerminating)
  253. }
  254. current := kcp.current
  255. // flush acknowledges
  256. kcp.receivingWorker.Flush()
  257. // calculate window size
  258. cwnd := kcp.snd_una + kcp.snd_wnd
  259. if cwnd > kcp.rmt_wnd {
  260. cwnd = kcp.rmt_wnd
  261. }
  262. if kcp.congestionControl && cwnd > kcp.snd_una+kcp.cwnd {
  263. cwnd = kcp.snd_una + kcp.cwnd
  264. }
  265. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  266. seg := kcp.snd_queue.Pop()
  267. seg.Conv = kcp.conv
  268. seg.Number = kcp.snd_nxt
  269. seg.timeout = current
  270. seg.ackSkipped = 0
  271. seg.transmit = 0
  272. kcp.snd_buf.Push(seg)
  273. kcp.snd_nxt++
  274. }
  275. // flush data segments
  276. if kcp.snd_buf.Flush() {
  277. kcp.sendingUpdated = false
  278. }
  279. if kcp.sendingUpdated || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  280. seg := &CmdOnlySegment{
  281. Conv: kcp.conv,
  282. Cmd: SegmentCommandPing,
  283. ReceivinNext: kcp.receivingWorker.nextNumber,
  284. SendingNext: kcp.snd_una,
  285. }
  286. if kcp.state == StateReadyToClose {
  287. seg.Opt = SegmentOptionClose
  288. }
  289. kcp.output.Write(seg)
  290. kcp.lastPingTime = kcp.current
  291. kcp.sendingUpdated = false
  292. }
  293. // flash remain segments
  294. kcp.output.Flush()
  295. }
  296. func (kcp *KCP) HandleLost(lost bool) {
  297. if !kcp.congestionControl {
  298. return
  299. }
  300. if lost {
  301. kcp.cwnd = 3 * kcp.cwnd / 4
  302. } else {
  303. kcp.cwnd += kcp.cwnd / 4
  304. }
  305. if kcp.cwnd < 4 {
  306. kcp.cwnd = 4
  307. }
  308. if kcp.cwnd > kcp.snd_wnd {
  309. kcp.cwnd = kcp.snd_wnd
  310. }
  311. }
  312. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  313. // ikcp_check when to call it again (without ikcp_input/_send calling).
  314. // 'current' - current timestamp in millisec.
  315. func (kcp *KCP) Update(current uint32) {
  316. kcp.current = current
  317. kcp.flush()
  318. }
  319. // WaitSnd gets how many packet is waiting to be sent
  320. func (kcp *KCP) WaitSnd() uint32 {
  321. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  322. }
  323. func (this *KCP) ClearSendQueue() {
  324. this.snd_queue.Clear()
  325. this.snd_buf.Clear(0xFFFFFFFF)
  326. }