connection.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package kcp
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/alloc"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. )
  12. var (
  13. errTimeout = errors.New("i/o timeout")
  14. errBrokenPipe = errors.New("broken pipe")
  15. errClosedListener = errors.New("Listener closed.")
  16. errClosedConnection = errors.New("Connection closed.")
  17. )
  18. type State int32
  19. const (
  20. StateActive State = 0
  21. StateReadyToClose State = 1
  22. StatePeerClosed State = 2
  23. StateTerminating State = 3
  24. StateTerminated State = 4
  25. )
  26. const (
  27. headerSize uint32 = 2
  28. )
  29. func nowMillisec() int64 {
  30. now := time.Now()
  31. return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
  32. }
  33. type RountTripInfo struct {
  34. sync.RWMutex
  35. variation uint32
  36. srtt uint32
  37. rto uint32
  38. minRtt uint32
  39. }
  40. func (this *RountTripInfo) Update(rtt uint32) {
  41. if rtt > 0x7FFFFFFF {
  42. return
  43. }
  44. this.Lock()
  45. defer this.Unlock()
  46. // https://tools.ietf.org/html/rfc6298
  47. if this.srtt == 0 {
  48. this.srtt = rtt
  49. this.variation = rtt / 2
  50. } else {
  51. delta := rtt - this.srtt
  52. if this.srtt > rtt {
  53. delta = this.srtt - rtt
  54. }
  55. this.variation = (3*this.variation + delta) / 4
  56. this.srtt = (7*this.srtt + rtt) / 8
  57. if this.srtt < this.minRtt {
  58. this.srtt = this.minRtt
  59. }
  60. }
  61. var rto uint32
  62. if this.minRtt < 4*this.variation {
  63. rto = this.srtt + 4*this.variation
  64. } else {
  65. rto = this.srtt + this.variation
  66. }
  67. if rto > 10000 {
  68. rto = 10000
  69. }
  70. this.rto = rto * 3 / 2
  71. }
  72. func (this *RountTripInfo) Timeout() uint32 {
  73. this.RLock()
  74. defer this.RUnlock()
  75. return this.rto
  76. }
  77. func (this *RountTripInfo) SmoothedTime() uint32 {
  78. this.RLock()
  79. defer this.RUnlock()
  80. return this.srtt
  81. }
  82. // Connection is a KCP connection over UDP.
  83. type Connection struct {
  84. block Authenticator
  85. local, remote net.Addr
  86. wd time.Time // write deadline
  87. writer io.WriteCloser
  88. since int64
  89. conv uint16
  90. state State
  91. stateBeginTime uint32
  92. lastIncomingTime uint32
  93. lastPayloadTime uint32
  94. sendingUpdated bool
  95. lastPingTime uint32
  96. mss uint32
  97. roundTrip *RountTripInfo
  98. interval uint32
  99. receivingWorker *ReceivingWorker
  100. sendingWorker *SendingWorker
  101. fastresend uint32
  102. congestionControl bool
  103. output *BufferedSegmentWriter
  104. }
  105. // NewConnection create a new KCP connection between local and remote.
  106. func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *Connection {
  107. log.Debug("KCP|Connection: creating connection ", conv)
  108. conn := new(Connection)
  109. conn.local = local
  110. conn.remote = remote
  111. conn.block = block
  112. conn.writer = writerCloser
  113. conn.since = nowMillisec()
  114. authWriter := &AuthenticationWriter{
  115. Authenticator: block,
  116. Writer: writerCloser,
  117. }
  118. conn.conv = conv
  119. conn.output = NewSegmentWriter(authWriter)
  120. conn.mss = authWriter.Mtu() - DataSegmentOverhead
  121. conn.roundTrip = &RountTripInfo{
  122. rto: 100,
  123. minRtt: effectiveConfig.Tti,
  124. }
  125. conn.interval = effectiveConfig.Tti
  126. conn.receivingWorker = NewReceivingWorker(conn)
  127. conn.fastresend = 2
  128. conn.congestionControl = effectiveConfig.Congestion
  129. conn.sendingWorker = NewSendingWorker(conn)
  130. go conn.updateTask()
  131. return conn
  132. }
  133. func (this *Connection) Elapsed() uint32 {
  134. return uint32(nowMillisec() - this.since)
  135. }
  136. // Read implements the Conn Read method.
  137. func (this *Connection) Read(b []byte) (int, error) {
  138. if this == nil {
  139. return 0, io.EOF
  140. }
  141. if this.State() == StateTerminating || this.State() == StateTerminated {
  142. return 0, io.EOF
  143. }
  144. return this.receivingWorker.Read(b)
  145. }
  146. // Write implements the Conn Write method.
  147. func (this *Connection) Write(b []byte) (int, error) {
  148. if this == nil || this.State() != StateActive {
  149. return 0, io.ErrClosedPipe
  150. }
  151. totalWritten := 0
  152. for {
  153. if this == nil || this.State() != StateActive {
  154. return totalWritten, io.ErrClosedPipe
  155. }
  156. nBytes := this.sendingWorker.Push(b[totalWritten:])
  157. if nBytes > 0 {
  158. totalWritten += nBytes
  159. if totalWritten == len(b) {
  160. return totalWritten, nil
  161. }
  162. }
  163. if !this.wd.IsZero() && this.wd.Before(time.Now()) {
  164. return totalWritten, errTimeout
  165. }
  166. // Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec.
  167. time.Sleep(time.Second)
  168. }
  169. }
  170. func (this *Connection) SetState(state State) {
  171. atomic.StoreInt32((*int32)(&this.state), int32(state))
  172. atomic.StoreUint32(&this.stateBeginTime, this.Elapsed())
  173. switch state {
  174. case StateReadyToClose:
  175. this.receivingWorker.CloseRead()
  176. case StatePeerClosed:
  177. this.sendingWorker.CloseWrite()
  178. case StateTerminating:
  179. this.receivingWorker.CloseRead()
  180. this.sendingWorker.CloseWrite()
  181. case StateTerminated:
  182. this.receivingWorker.CloseRead()
  183. this.sendingWorker.CloseWrite()
  184. }
  185. }
  186. // Close closes the connection.
  187. func (this *Connection) Close() error {
  188. if this == nil {
  189. return errClosedConnection
  190. }
  191. state := this.State()
  192. if state == StateReadyToClose ||
  193. state == StateTerminating ||
  194. state == StateTerminated {
  195. return errClosedConnection
  196. }
  197. log.Debug("KCP|Connection: Closing connection to ", this.remote)
  198. if state == StateActive {
  199. this.SetState(StateReadyToClose)
  200. }
  201. if state == StatePeerClosed {
  202. this.SetState(StateTerminating)
  203. }
  204. return nil
  205. }
  206. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  207. func (this *Connection) LocalAddr() net.Addr {
  208. if this == nil {
  209. return nil
  210. }
  211. return this.local
  212. }
  213. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  214. func (this *Connection) RemoteAddr() net.Addr {
  215. if this == nil {
  216. return nil
  217. }
  218. return this.remote
  219. }
  220. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  221. func (this *Connection) SetDeadline(t time.Time) error {
  222. if err := this.SetReadDeadline(t); err != nil {
  223. return err
  224. }
  225. if err := this.SetWriteDeadline(t); err != nil {
  226. return err
  227. }
  228. return nil
  229. }
  230. // SetReadDeadline implements the Conn SetReadDeadline method.
  231. func (this *Connection) SetReadDeadline(t time.Time) error {
  232. if this == nil || this.State() != StateActive {
  233. return errClosedConnection
  234. }
  235. this.receivingWorker.SetReadDeadline(t)
  236. return nil
  237. }
  238. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  239. func (this *Connection) SetWriteDeadline(t time.Time) error {
  240. if this == nil || this.State() != StateActive {
  241. return errClosedConnection
  242. }
  243. this.wd = t
  244. return nil
  245. }
  246. // kcp update, input loop
  247. func (this *Connection) updateTask() {
  248. for this.State() != StateTerminated {
  249. this.flush()
  250. interval := time.Duration(effectiveConfig.Tti) * time.Millisecond
  251. if this.State() == StateTerminating {
  252. interval = time.Second
  253. }
  254. time.Sleep(interval)
  255. }
  256. this.Terminate()
  257. }
  258. func (this *Connection) FetchInputFrom(conn net.Conn) {
  259. go func() {
  260. for {
  261. payload := alloc.NewBuffer()
  262. nBytes, err := conn.Read(payload.Value)
  263. if err != nil {
  264. payload.Release()
  265. return
  266. }
  267. payload.Slice(0, nBytes)
  268. if this.block.Open(payload) {
  269. this.Input(payload.Value)
  270. } else {
  271. log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
  272. }
  273. payload.Release()
  274. }
  275. }()
  276. }
  277. func (this *Connection) Reusable() bool {
  278. return false
  279. }
  280. func (this *Connection) SetReusable(b bool) {}
  281. func (this *Connection) Terminate() {
  282. if this == nil || this.writer == nil {
  283. return
  284. }
  285. log.Info("Terminating connection to ", this.RemoteAddr())
  286. this.writer.Close()
  287. }
  288. func (this *Connection) HandleOption(opt SegmentOption) {
  289. if (opt & SegmentOptionClose) == SegmentOptionClose {
  290. this.OnPeerClosed()
  291. }
  292. }
  293. func (this *Connection) OnPeerClosed() {
  294. state := this.State()
  295. if state == StateReadyToClose {
  296. this.SetState(StateTerminating)
  297. }
  298. if state == StateActive {
  299. this.SetState(StatePeerClosed)
  300. }
  301. }
  302. // Input when you received a low level packet (eg. UDP packet), call it
  303. func (this *Connection) Input(data []byte) int {
  304. current := this.Elapsed()
  305. atomic.StoreUint32(&this.lastIncomingTime, current)
  306. var seg Segment
  307. for {
  308. seg, data = ReadSegment(data)
  309. if seg == nil {
  310. break
  311. }
  312. switch seg := seg.(type) {
  313. case *DataSegment:
  314. this.HandleOption(seg.Opt)
  315. this.receivingWorker.ProcessSegment(seg)
  316. atomic.StoreUint32(&this.lastPayloadTime, current)
  317. case *AckSegment:
  318. this.HandleOption(seg.Opt)
  319. this.sendingWorker.ProcessSegment(current, seg)
  320. atomic.StoreUint32(&this.lastPayloadTime, current)
  321. case *CmdOnlySegment:
  322. this.HandleOption(seg.Opt)
  323. if seg.Cmd == SegmentCommandTerminated {
  324. state := this.State()
  325. if state == StateActive ||
  326. state == StateReadyToClose ||
  327. state == StatePeerClosed {
  328. this.SetState(StateTerminating)
  329. } else if state == StateTerminating {
  330. this.SetState(StateTerminated)
  331. }
  332. }
  333. this.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
  334. this.receivingWorker.ProcessSendingNext(seg.SendingNext)
  335. default:
  336. }
  337. }
  338. return 0
  339. }
  340. func (this *Connection) flush() {
  341. current := this.Elapsed()
  342. if this.State() == StateTerminated {
  343. return
  344. }
  345. if this.State() == StateActive && current-this.lastPayloadTime >= 30000 {
  346. this.Close()
  347. }
  348. if this.State() == StateTerminating {
  349. this.output.Write(&CmdOnlySegment{
  350. Conv: this.conv,
  351. Cmd: SegmentCommandTerminated,
  352. })
  353. this.output.Flush()
  354. if current-this.stateBeginTime > 8000 {
  355. this.SetState(StateTerminated)
  356. }
  357. return
  358. }
  359. if this.State() == StateReadyToClose && current-this.stateBeginTime > 15000 {
  360. this.SetState(StateTerminating)
  361. }
  362. // flush acknowledges
  363. this.receivingWorker.Flush(current)
  364. this.sendingWorker.Flush(current)
  365. if this.sendingWorker.PingNecessary() || this.receivingWorker.PingNecessary() || current-this.lastPingTime >= 5000 {
  366. seg := NewCmdOnlySegment()
  367. seg.Conv = this.conv
  368. seg.Cmd = SegmentCommandPing
  369. seg.ReceivinNext = this.receivingWorker.nextNumber
  370. seg.SendingNext = this.sendingWorker.firstUnacknowledged
  371. if this.State() == StateReadyToClose {
  372. seg.Opt = SegmentOptionClose
  373. }
  374. this.output.Write(seg)
  375. this.lastPingTime = current
  376. this.sendingUpdated = false
  377. seg.Release()
  378. }
  379. // flash remain segments
  380. this.output.Flush()
  381. }
  382. func (this *Connection) State() State {
  383. return State(atomic.LoadInt32((*int32)(&this.state)))
  384. }