kcp.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Acknowledgement:
  4. // skywind3000@github for inventing the KCP protocol
  5. // xtaci@github for translating to Golang
  6. package kcp
  7. import (
  8. "github.com/v2ray/v2ray-core/common/log"
  9. )
  10. type State int
  11. const (
  12. StateActive State = 0
  13. StateReadyToClose State = 1
  14. StatePeerClosed State = 2
  15. StateTerminating State = 3
  16. StateTerminated State = 4
  17. )
  18. // KCP defines a single KCP connection
  19. type KCP struct {
  20. conv uint16
  21. state State
  22. stateBeginTime uint32
  23. lastIncomingTime uint32
  24. lastPayloadTime uint32
  25. sendingUpdated bool
  26. lastPingTime uint32
  27. mss uint32
  28. rx_rttvar, rx_srtt, rx_rto uint32
  29. current, interval uint32
  30. receivingWorker *ReceivingWorker
  31. sendingWorker *SendingWorker
  32. fastresend uint32
  33. congestionControl bool
  34. output *BufferedSegmentWriter
  35. }
  36. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  37. // from the same connection.
  38. func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
  39. log.Debug("KCP|Core: creating KCP ", conv)
  40. kcp := new(KCP)
  41. kcp.conv = conv
  42. kcp.mss = output.Mtu() - DataSegmentOverhead
  43. kcp.rx_rto = 100
  44. kcp.interval = effectiveConfig.Tti
  45. kcp.output = NewSegmentWriter(output)
  46. kcp.receivingWorker = NewReceivingWorker(kcp)
  47. kcp.fastresend = 2
  48. kcp.congestionControl = effectiveConfig.Congestion
  49. kcp.sendingWorker = NewSendingWorker(kcp)
  50. return kcp
  51. }
  52. func (kcp *KCP) SetState(state State) {
  53. kcp.state = state
  54. kcp.stateBeginTime = kcp.current
  55. switch state {
  56. case StateReadyToClose:
  57. kcp.receivingWorker.CloseRead()
  58. case StatePeerClosed:
  59. kcp.sendingWorker.CloseWrite()
  60. case StateTerminating:
  61. kcp.receivingWorker.CloseRead()
  62. kcp.sendingWorker.CloseWrite()
  63. case StateTerminated:
  64. kcp.receivingWorker.CloseRead()
  65. kcp.sendingWorker.CloseWrite()
  66. }
  67. }
  68. func (kcp *KCP) HandleOption(opt SegmentOption) {
  69. if (opt & SegmentOptionClose) == SegmentOptionClose {
  70. kcp.OnPeerClosed()
  71. }
  72. }
  73. func (kcp *KCP) OnPeerClosed() {
  74. if kcp.state == StateReadyToClose {
  75. kcp.SetState(StateTerminating)
  76. }
  77. if kcp.state == StateActive {
  78. kcp.SetState(StatePeerClosed)
  79. }
  80. }
  81. func (kcp *KCP) OnClose() {
  82. if kcp.state == StateActive {
  83. kcp.SetState(StateReadyToClose)
  84. }
  85. if kcp.state == StatePeerClosed {
  86. kcp.SetState(StateTerminating)
  87. }
  88. }
  89. // https://tools.ietf.org/html/rfc6298
  90. func (kcp *KCP) update_ack(rtt int32) {
  91. if kcp.rx_srtt == 0 {
  92. kcp.rx_srtt = uint32(rtt)
  93. kcp.rx_rttvar = uint32(rtt) / 2
  94. } else {
  95. delta := rtt - int32(kcp.rx_srtt)
  96. if delta < 0 {
  97. delta = -delta
  98. }
  99. kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
  100. kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
  101. if kcp.rx_srtt < kcp.interval {
  102. kcp.rx_srtt = kcp.interval
  103. }
  104. }
  105. var rto uint32
  106. if kcp.interval < 4*kcp.rx_rttvar {
  107. rto = kcp.rx_srtt + 4*kcp.rx_rttvar
  108. } else {
  109. rto = kcp.rx_srtt + kcp.interval
  110. }
  111. if rto > 10000 {
  112. rto = 10000
  113. }
  114. kcp.rx_rto = rto * 3 / 2
  115. }
  116. // Input when you received a low level packet (eg. UDP packet), call it
  117. func (kcp *KCP) Input(data []byte) int {
  118. kcp.lastIncomingTime = kcp.current
  119. var seg Segment
  120. for {
  121. seg, data = ReadSegment(data)
  122. if seg == nil {
  123. break
  124. }
  125. switch seg := seg.(type) {
  126. case *DataSegment:
  127. kcp.HandleOption(seg.Opt)
  128. kcp.receivingWorker.ProcessSegment(seg)
  129. kcp.lastPayloadTime = kcp.current
  130. case *AckSegment:
  131. kcp.HandleOption(seg.Opt)
  132. kcp.sendingWorker.ProcessSegment(seg)
  133. kcp.lastPayloadTime = kcp.current
  134. case *CmdOnlySegment:
  135. kcp.HandleOption(seg.Opt)
  136. if seg.Cmd == SegmentCommandTerminated {
  137. if kcp.state == StateActive ||
  138. kcp.state == StateReadyToClose ||
  139. kcp.state == StatePeerClosed {
  140. kcp.SetState(StateTerminating)
  141. } else if kcp.state == StateTerminating {
  142. kcp.SetState(StateTerminated)
  143. }
  144. }
  145. kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
  146. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  147. default:
  148. }
  149. }
  150. return 0
  151. }
  152. // flush pending data
  153. func (kcp *KCP) flush() {
  154. if kcp.state == StateTerminated {
  155. return
  156. }
  157. if kcp.state == StateActive && kcp.current-kcp.lastPayloadTime >= 30000 {
  158. kcp.OnClose()
  159. }
  160. if kcp.state == StateTerminating {
  161. kcp.output.Write(&CmdOnlySegment{
  162. Conv: kcp.conv,
  163. Cmd: SegmentCommandTerminated,
  164. })
  165. kcp.output.Flush()
  166. if kcp.current-kcp.stateBeginTime > 8000 {
  167. kcp.SetState(StateTerminated)
  168. }
  169. return
  170. }
  171. if kcp.state == StateReadyToClose && kcp.current-kcp.stateBeginTime > 15000 {
  172. kcp.SetState(StateTerminating)
  173. }
  174. // flush acknowledges
  175. kcp.receivingWorker.Flush()
  176. kcp.sendingWorker.Flush()
  177. if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || kcp.current-kcp.lastPingTime >= 5000 {
  178. seg := NewCmdOnlySegment()
  179. seg.Conv = kcp.conv
  180. seg.Cmd = SegmentCommandPing
  181. seg.ReceivinNext = kcp.receivingWorker.nextNumber
  182. seg.SendingNext = kcp.sendingWorker.firstUnacknowledged
  183. if kcp.state == StateReadyToClose {
  184. seg.Opt = SegmentOptionClose
  185. }
  186. kcp.output.Write(seg)
  187. kcp.lastPingTime = kcp.current
  188. kcp.sendingUpdated = false
  189. seg.Release()
  190. }
  191. // flash remain segments
  192. kcp.output.Flush()
  193. }
  194. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  195. // ikcp_check when to call it again (without ikcp_input/_send calling).
  196. // 'current' - current timestamp in millisec.
  197. func (kcp *KCP) Update(current uint32) {
  198. kcp.current = current
  199. kcp.flush()
  200. }