kcp.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/log"
  9. )
  10. func _itimediff(later, earlier uint32) int32 {
  11. return (int32)(later - earlier)
  12. }
  13. type State int
  14. const (
  15. StateActive State = 0
  16. StateReadyToClose State = 1
  17. StatePeerClosed State = 2
  18. StateTerminating State = 3
  19. StateTerminated State = 4
  20. )
  21. // KCP defines a single KCP connection
  22. type KCP struct {
  23. conv uint16
  24. state State
  25. stateBeginTime uint32
  26. lastIncomingTime uint32
  27. lastPayloadTime uint32
  28. sendingUpdated bool
  29. lastPingTime uint32
  30. mss uint32
  31. rx_rttvar, rx_srtt, rx_rto uint32
  32. current, interval uint32
  33. receivingWorker *ReceivingWorker
  34. sendingWorker *SendingWorker
  35. fastresend uint32
  36. congestionControl bool
  37. output *BufferedSegmentWriter
  38. }
  39. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  40. // from the same connection.
  41. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  42. log.Debug("KCP|Core: creating KCP ", conv)
  43. kcp := new(KCP)
  44. kcp.conv = conv
  45. kcp.mss = output.Mtu() - DataSegmentOverhead
  46. kcp.rx_rto = 100
  47. kcp.interval = effectiveConfig.Tti
  48. kcp.output = NewSegmentWriter(output)
  49. kcp.receivingWorker = NewReceivingWorker(kcp)
  50. kcp.fastresend = 2
  51. kcp.congestionControl = effectiveConfig.Congestion
  52. kcp.sendingWorker = NewSendingWorker(kcp)
  53. return kcp
  54. }
  55. func (kcp *KCP) SetState(state State) {
  56. kcp.state = state
  57. kcp.stateBeginTime = kcp.current
  58. switch state {
  59. case StateReadyToClose:
  60. kcp.receivingWorker.CloseRead()
  61. case StatePeerClosed:
  62. kcp.sendingWorker.CloseWrite()
  63. case StateTerminating:
  64. kcp.receivingWorker.CloseRead()
  65. kcp.sendingWorker.CloseWrite()
  66. case StateTerminated:
  67. kcp.receivingWorker.CloseRead()
  68. kcp.sendingWorker.CloseWrite()
  69. }
  70. }
  71. func (kcp *KCP) HandleOption(opt SegmentOption) {
  72. if (opt & SegmentOptionClose) == SegmentOptionClose {
  73. kcp.OnPeerClosed()
  74. }
  75. }
  76. func (kcp *KCP) OnPeerClosed() {
  77. if kcp.state == StateReadyToClose {
  78. kcp.SetState(StateTerminating)
  79. }
  80. if kcp.state == StateActive {
  81. kcp.SetState(StatePeerClosed)
  82. }
  83. }
  84. func (kcp *KCP) OnClose() {
  85. if kcp.state == StateActive {
  86. kcp.SetState(StateReadyToClose)
  87. }
  88. if kcp.state == StatePeerClosed {
  89. kcp.SetState(StateTerminating)
  90. }
  91. }
  92. // https://tools.ietf.org/html/rfc6298
  93. func (kcp *KCP) update_ack(rtt int32) {
  94. if kcp.rx_srtt == 0 {
  95. kcp.rx_srtt = uint32(rtt)
  96. kcp.rx_rttvar = uint32(rtt) / 2
  97. } else {
  98. delta := rtt - int32(kcp.rx_srtt)
  99. if delta < 0 {
  100. delta = -delta
  101. }
  102. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  103. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  104. if kcp.rx_srtt < kcp.interval {
  105. kcp.rx_srtt = kcp.interval
  106. }
  107. }
  108. var rto uint32
  109. if kcp.interval < 4*kcp.rx_rttvar {
  110. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  111. } else {
  112. rto = kcp.rx_srtt + kcp.interval
  113. }
  114. if rto > 10000 {
  115. rto = 10000
  116. }
  117. kcp.rx_rto = rto * 3 / 2
  118. }
  119. // Input when you received a low level packet (eg. UDP packet), call it
  120. func (kcp *KCP) Input(data []byte) int {
  121. kcp.lastIncomingTime = kcp.current
  122. var seg Segment
  123. for {
  124. seg, data = ReadSegment(data)
  125. if seg == nil {
  126. break
  127. }
  128. switch seg := seg.(type) {
  129. case *DataSegment:
  130. kcp.HandleOption(seg.Opt)
  131. kcp.receivingWorker.ProcessSegment(seg)
  132. kcp.lastPayloadTime = kcp.current
  133. case *AckSegment:
  134. kcp.HandleOption(seg.Opt)
  135. kcp.sendingWorker.ProcessSegment(seg)
  136. kcp.lastPayloadTime = kcp.current
  137. case *CmdOnlySegment:
  138. kcp.HandleOption(seg.Opt)
  139. if seg.Cmd == SegmentCommandTerminated {
  140. if kcp.state == StateActive ||
  141. kcp.state == StateReadyToClose ||
  142. kcp.state == StatePeerClosed {
  143. kcp.SetState(StateTerminating)
  144. } else if kcp.state == StateTerminating {
  145. kcp.SetState(StateTerminated)
  146. }
  147. }
  148. kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
  149. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  150. default:
  151. }
  152. }
  153. return 0
  154. }
  155. // flush pending data
  156. func (kcp *KCP) flush() {
  157. if kcp.state == StateTerminated {
  158. return
  159. }
  160. if kcp.state == StateActive && _itimediff(kcp.current, kcp.lastPayloadTime) >= 30000 {
  161. kcp.OnClose()
  162. }
  163. if kcp.state == StateTerminating {
  164. kcp.output.Write(&CmdOnlySegment{
  165. Conv: kcp.conv,
  166. Cmd: SegmentCommandTerminated,
  167. })
  168. kcp.output.Flush()
  169. if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 {
  170. kcp.SetState(StateTerminated)
  171. }
  172. return
  173. }
  174. if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 {
  175. kcp.SetState(StateTerminating)
  176. }
  177. // flush acknowledges
  178. kcp.receivingWorker.Flush()
  179. kcp.sendingWorker.Flush()
  180. if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
  181. seg := NewCmdOnlySegment()
  182. seg.Conv = kcp.conv
  183. seg.Cmd = SegmentCommandPing
  184. seg.ReceivinNext = kcp.receivingWorker.nextNumber
  185. seg.SendingNext = kcp.sendingWorker.firstUnacknowledged
  186. if kcp.state == StateReadyToClose {
  187. seg.Opt = SegmentOptionClose
  188. }
  189. kcp.output.Write(seg)
  190. kcp.lastPingTime = kcp.current
  191. kcp.sendingUpdated = false
  192. seg.Release()
  193. }
  194. // flash remain segments
  195. kcp.output.Flush()
  196. }
  197. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  198. // ikcp_check when to call it again (without ikcp_input/_send calling).
  199. // 'current' - current timestamp in millisec.
  200. func (kcp *KCP) Update(current uint32) {
  201. kcp.current = current
  202. kcp.flush()
  203. }