kcp.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. func _itimediff(later, earlier uint32) int32 {
  12. return (int32)(later - earlier)
  13. }
  14. type State int
  15. const (
  16. StateActive State = 0
  17. StateReadyToClose State = 1
  18. StatePeerClosed State = 2
  19. StateTerminating State = 3
  20. StateTerminated State = 4
  21. )
  22. // KCP defines a single KCP connection
  23. type KCP struct {
  24. conv uint16
  25. state State
  26. stateBeginTime uint32
  27. lastIncomingTime uint32
  28. lastPayloadTime uint32
  29. sendingUpdated bool
  30. lastPingTime uint32
  31. mss uint32
  32. snd_una, snd_nxt uint32
  33. rx_rttvar, rx_srtt, rx_rto uint32
  34. snd_wnd, rmt_wnd, cwnd uint32
  35. current, interval uint32
  36. snd_queue *SendingQueue
  37. snd_buf *SendingWindow
  38. receivingWorker *ReceivingWorker
  39. fastresend uint32
  40. congestionControl bool
  41. output *BufferedSegmentWriter
  42. }
  43. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  44. // from the same connection.
  45. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  46. log.Debug("KCP|Core: creating KCP ", conv)
  47. kcp := new(KCP)
  48. kcp.conv = conv
  49. kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
  50. kcp.rmt_wnd = 32
  51. kcp.mss = output.Mtu() - DataSegmentOverhead
  52. kcp.rx_rto = 100
  53. kcp.interval = effectiveConfig.Tti
  54. kcp.output = NewSegmentWriter(output)
  55. kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
  56. kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
  57. kcp.cwnd = kcp.snd_wnd
  58. kcp.receivingWorker = NewReceivingWorker(kcp)
  59. kcp.fastresend = 2
  60. kcp.congestionControl = effectiveConfig.Congestion
  61. return kcp
  62. }
  63. func (kcp *KCP) SetState(state State) {
  64. kcp.state = state
  65. kcp.stateBeginTime = kcp.current
  66. switch state {
  67. case StateReadyToClose:
  68. kcp.receivingWorker.CloseRead()
  69. case StatePeerClosed:
  70. kcp.ClearSendQueue()
  71. case StateTerminating:
  72. kcp.receivingWorker.CloseRead()
  73. case StateTerminated:
  74. kcp.receivingWorker.CloseRead()
  75. }
  76. }
  77. func (kcp *KCP) HandleOption(opt SegmentOption) {
  78. if (opt & SegmentOptionClose) == SegmentOptionClose {
  79. kcp.OnPeerClosed()
  80. }
  81. }
  82. func (kcp *KCP) OnPeerClosed() {
  83. if kcp.state == StateReadyToClose {
  84. kcp.SetState(StateTerminating)
  85. }
  86. if kcp.state == StateActive {
  87. kcp.SetState(StatePeerClosed)
  88. }
  89. }
  90. func (kcp *KCP) OnClose() {
  91. if kcp.state == StateActive {
  92. kcp.SetState(StateReadyToClose)
  93. }
  94. if kcp.state == StatePeerClosed {
  95. kcp.SetState(StateTerminating)
  96. }
  97. }
  98. // Send is user/upper level send, returns below zero for error
  99. func (kcp *KCP) Send(buffer []byte) int {
  100. nBytes := 0
  101. for len(buffer) > 0 && !kcp.snd_queue.IsFull() {
  102. var size int
  103. if len(buffer) > int(kcp.mss) {
  104. size = int(kcp.mss)
  105. } else {
  106. size = len(buffer)
  107. }
  108. seg := &DataSegment{
  109. Data: alloc.NewSmallBuffer().Clear().Append(buffer[:size]),
  110. }
  111. kcp.snd_queue.Push(seg)
  112. buffer = buffer[size:]
  113. nBytes += size
  114. }
  115. return nBytes
  116. }
  117. // https://tools.ietf.org/html/rfc6298
  118. func (kcp *KCP) update_ack(rtt int32) {
  119. if kcp.rx_srtt == 0 {
  120. kcp.rx_srtt = uint32(rtt)
  121. kcp.rx_rttvar = uint32(rtt) / 2
  122. } else {
  123. delta := rtt - int32(kcp.rx_srtt)
  124. if delta < 0 {
  125. delta = -delta
  126. }
  127. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  128. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  129. if kcp.rx_srtt < kcp.interval {
  130. kcp.rx_srtt = kcp.interval
  131. }
  132. }
  133. var rto uint32
  134. if kcp.interval < 4*kcp.rx_rttvar {
  135. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  136. } else {
  137. rto = kcp.rx_srtt + kcp.interval
  138. }
  139. if rto > 10000 {
  140. rto = 10000
  141. }
  142. kcp.rx_rto = rto * 3 / 2
  143. }
  144. func (kcp *KCP) shrink_buf() {
  145. prevUna := kcp.snd_una
  146. if kcp.snd_buf.Len() > 0 {
  147. seg := kcp.snd_buf.First()
  148. kcp.snd_una = seg.Number
  149. } else {
  150. kcp.snd_una = kcp.snd_nxt
  151. }
  152. if kcp.snd_una != prevUna {
  153. kcp.sendingUpdated = true
  154. }
  155. }
  156. func (kcp *KCP) parse_ack(sn uint32) {
  157. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  158. return
  159. }
  160. kcp.snd_buf.Remove(sn - kcp.snd_una)
  161. }
  162. func (kcp *KCP) parse_fastack(sn uint32) {
  163. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  164. return
  165. }
  166. kcp.snd_buf.HandleFastAck(sn)
  167. }
  168. func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
  169. kcp.snd_buf.Clear(receivingNext)
  170. }
  171. // Input when you received a low level packet (eg. UDP packet), call it
  172. func (kcp *KCP) Input(data []byte) int {
  173. kcp.lastIncomingTime = kcp.current
  174. var seg ISegment
  175. var maxack uint32
  176. var flag int
  177. for {
  178. seg, data = ReadSegment(data)
  179. if seg == nil {
  180. break
  181. }
  182. switch seg := seg.(type) {
  183. case *DataSegment:
  184. kcp.HandleOption(seg.Opt)
  185. kcp.receivingWorker.ProcessSegment(seg)
  186. kcp.lastPayloadTime = kcp.current
  187. case *AckSegment:
  188. kcp.HandleOption(seg.Opt)
  189. if kcp.rmt_wnd < seg.ReceivingWindow {
  190. kcp.rmt_wnd = seg.ReceivingWindow
  191. }
  192. kcp.HandleReceivingNext(seg.ReceivingNext)
  193. for i := 0; i < int(seg.Count); i++ {
  194. ts := seg.TimestampList[i]
  195. sn := seg.NumberList[i]
  196. if _itimediff(kcp.current, ts) >= 0 {
  197. kcp.update_ack(_itimediff(kcp.current, ts))
  198. }
  199. kcp.parse_ack(sn)
  200. kcp.shrink_buf()
  201. if flag == 0 {
  202. flag = 1
  203. maxack = sn
  204. } else if _itimediff(sn, maxack) > 0 {
  205. maxack = sn
  206. }
  207. }
  208. kcp.lastPayloadTime = kcp.current
  209. case *CmdOnlySegment:
  210. kcp.HandleOption(seg.Opt)
  211. if seg.Cmd == SegmentCommandTerminated {
  212. if kcp.state == StateActive ||
  213. kcp.state == StateReadyToClose ||
  214. kcp.state == StatePeerClosed {
  215. kcp.SetState(StateTerminating)
  216. } else if kcp.state == StateTerminating {
  217. kcp.SetState(StateTerminated)
  218. }
  219. }
  220. kcp.HandleReceivingNext(seg.ReceivinNext)
  221. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  222. kcp.shrink_buf()
  223. default:
  224. }
  225. }
  226. if flag != 0 {
  227. kcp.parse_fastack(maxack)
  228. }
  229. return 0
  230. }
  231. // flush pending data
  232. func (kcp *KCP) flush() {
  233. if kcp.state == StateTerminated {
  234. return
  235. }
  236. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  237. kcp.OnClose()
  238. }
  239. if kcp.state == StateTerminating {
  240. kcp.output.Write(&CmdOnlySegment{
  241. Conv: kcp.conv,
  242. Cmd: SegmentCommandTerminated,
  243. })
  244. kcp.output.Flush()
  245. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  246. kcp.SetState(StateTerminated)
  247. }
  248. return
  249. }
  250. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  251. kcp.SetState(StateTerminating)
  252. }
  253. current := kcp.current
  254. // flush acknowledges
  255. kcp.receivingWorker.Flush()
  256. // calculate window size
  257. cwnd := kcp.snd_una + kcp.snd_wnd
  258. if cwnd < kcp.rmt_wnd {
  259. cwnd = kcp.rmt_wnd
  260. }
  261. if kcp.congestionControl && cwnd < kcp.snd_una+kcp.cwnd {
  262. cwnd = kcp.snd_una + kcp.cwnd
  263. }
  264. for !kcp.snd_queue.IsEmpty() && _itimediff(kcp.snd_nxt, cwnd) < 0 {
  265. seg := kcp.snd_queue.Pop()
  266. seg.Conv = kcp.conv
  267. seg.Number = kcp.snd_nxt
  268. seg.timeout = current
  269. seg.ackSkipped = 0
  270. seg.transmit = 0
  271. kcp.snd_buf.Push(seg)
  272. kcp.snd_nxt++
  273. }
  274. // flush data segments
  275. if kcp.snd_buf.Flush() {
  276. kcp.sendingUpdated = false
  277. }
  278. if kcp.sendingUpdated || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  279. seg := &CmdOnlySegment{
  280. Conv: kcp.conv,
  281. Cmd: SegmentCommandPing,
  282. ReceivinNext: kcp.receivingWorker.nextNumber,
  283. SendingNext: kcp.snd_una,
  284. }
  285. if kcp.state == StateReadyToClose {
  286. seg.Opt = SegmentOptionClose
  287. }
  288. kcp.output.Write(seg)
  289. kcp.lastPingTime = kcp.current
  290. kcp.sendingUpdated = false
  291. }
  292. // flash remain segments
  293. kcp.output.Flush()
  294. }
  295. func (kcp *KCP) HandleLost(lost bool) {
  296. if !kcp.congestionControl {
  297. return
  298. }
  299. if lost {
  300. kcp.cwnd = 3 * kcp.cwnd / 4
  301. } else {
  302. kcp.cwnd += kcp.cwnd / 4
  303. }
  304. if kcp.cwnd < 4 {
  305. kcp.cwnd = 4
  306. }
  307. if kcp.cwnd > kcp.snd_wnd {
  308. kcp.cwnd = kcp.snd_wnd
  309. }
  310. }
  311. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  312. // ikcp_check when to call it again (without ikcp_input/_send calling).
  313. // 'current' - current timestamp in millisec.
  314. func (kcp *KCP) Update(current uint32) {
  315. kcp.current = current
  316. kcp.flush()
  317. }
  318. // WaitSnd gets how many packet is waiting to be sent
  319. func (kcp *KCP) WaitSnd() uint32 {
  320. return uint32(kcp.snd_buf.Len()) + kcp.snd_queue.Len()
  321. }
  322. func (this *KCP) ClearSendQueue() {
  323. this.snd_queue.Clear()
  324. this.snd_buf.Clear(0xFFFFFFFF)
  325. }