connection.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. package kcp
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/v2ray/v2ray-core/common/alloc"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. "github.com/v2ray/v2ray-core/transport/internet"
  12. )
  13. var (
  14. errTimeout = errors.New("i/o timeout")
  15. errBrokenPipe = errors.New("broken pipe")
  16. errClosedListener = errors.New("Listener closed.")
  17. errClosedConnection = errors.New("Connection closed.")
  18. )
  19. type State int32
  20. func (this State) Is(states ...State) bool {
  21. for _, state := range states {
  22. if this == state {
  23. return true
  24. }
  25. }
  26. return false
  27. }
  28. const (
  29. StateActive State = 0
  30. StateReadyToClose State = 1
  31. StatePeerClosed State = 2
  32. StateTerminating State = 3
  33. StatePeerTerminating State = 4
  34. StateTerminated State = 5
  35. )
  36. const (
  37. headerSize uint32 = 2
  38. )
  39. func nowMillisec() int64 {
  40. now := time.Now()
  41. return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
  42. }
  43. type RoundTripInfo struct {
  44. sync.RWMutex
  45. variation uint32
  46. srtt uint32
  47. rto uint32
  48. minRtt uint32
  49. updatedTimestamp uint32
  50. }
  51. func (this *RoundTripInfo) UpdatePeerRTO(rto uint32, current uint32) {
  52. this.Lock()
  53. defer this.Unlock()
  54. if current-this.updatedTimestamp < 3000 {
  55. return
  56. }
  57. this.updatedTimestamp = current
  58. this.rto = rto
  59. }
  60. func (this *RoundTripInfo) Update(rtt uint32, current uint32) {
  61. if rtt > 0x7FFFFFFF {
  62. return
  63. }
  64. this.Lock()
  65. defer this.Unlock()
  66. // https://tools.ietf.org/html/rfc6298
  67. if this.srtt == 0 {
  68. this.srtt = rtt
  69. this.variation = rtt / 2
  70. } else {
  71. delta := rtt - this.srtt
  72. if this.srtt > rtt {
  73. delta = this.srtt - rtt
  74. }
  75. this.variation = (3*this.variation + delta) / 4
  76. this.srtt = (7*this.srtt + rtt) / 8
  77. if this.srtt < this.minRtt {
  78. this.srtt = this.minRtt
  79. }
  80. }
  81. var rto uint32
  82. if this.minRtt < 4*this.variation {
  83. rto = this.srtt + 4*this.variation
  84. } else {
  85. rto = this.srtt + this.variation
  86. }
  87. if rto > 10000 {
  88. rto = 10000
  89. }
  90. this.rto = rto * 3 / 2
  91. this.updatedTimestamp = current
  92. }
  93. func (this *RoundTripInfo) Timeout() uint32 {
  94. this.RLock()
  95. defer this.RUnlock()
  96. return this.rto
  97. }
  98. func (this *RoundTripInfo) SmoothedTime() uint32 {
  99. this.RLock()
  100. defer this.RUnlock()
  101. return this.srtt
  102. }
  103. // Connection is a KCP connection over UDP.
  104. type Connection struct {
  105. block internet.Authenticator
  106. local, remote net.Addr
  107. rd time.Time
  108. wd time.Time // write deadline
  109. writer io.WriteCloser
  110. since int64
  111. dataInputCond *sync.Cond
  112. dataOutputCond *sync.Cond
  113. conv uint16
  114. state State
  115. stateBeginTime uint32
  116. lastIncomingTime uint32
  117. lastPingTime uint32
  118. mss uint32
  119. roundTrip *RoundTripInfo
  120. interval uint32
  121. receivingWorker *ReceivingWorker
  122. sendingWorker *SendingWorker
  123. fastresend uint32
  124. congestionControl bool
  125. output *BufferedSegmentWriter
  126. }
  127. // NewConnection create a new KCP connection between local and remote.
  128. func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block internet.Authenticator) *Connection {
  129. log.Info("KCP|Connection: creating connection ", conv)
  130. conn := new(Connection)
  131. conn.local = local
  132. conn.remote = remote
  133. conn.block = block
  134. conn.writer = writerCloser
  135. conn.since = nowMillisec()
  136. conn.dataInputCond = sync.NewCond(new(sync.Mutex))
  137. conn.dataOutputCond = sync.NewCond(new(sync.Mutex))
  138. authWriter := &AuthenticationWriter{
  139. Authenticator: block,
  140. Writer: writerCloser,
  141. }
  142. conn.conv = conv
  143. conn.output = NewSegmentWriter(authWriter)
  144. conn.mss = authWriter.Mtu() - DataSegmentOverhead
  145. conn.roundTrip = &RoundTripInfo{
  146. rto: 100,
  147. minRtt: effectiveConfig.Tti,
  148. }
  149. conn.interval = effectiveConfig.Tti
  150. conn.receivingWorker = NewReceivingWorker(conn)
  151. conn.fastresend = 2
  152. conn.congestionControl = effectiveConfig.Congestion
  153. conn.sendingWorker = NewSendingWorker(conn)
  154. go conn.updateTask()
  155. return conn
  156. }
  157. func (this *Connection) Elapsed() uint32 {
  158. return uint32(nowMillisec() - this.since)
  159. }
  160. // Read implements the Conn Read method.
  161. func (this *Connection) Read(b []byte) (int, error) {
  162. if this == nil {
  163. return 0, io.EOF
  164. }
  165. for {
  166. if this.State().Is(StateReadyToClose, StateTerminating, StateTerminated) {
  167. return 0, io.EOF
  168. }
  169. nBytes := this.receivingWorker.Read(b)
  170. if nBytes > 0 {
  171. return nBytes, nil
  172. }
  173. if this.State() == StatePeerTerminating {
  174. return 0, io.EOF
  175. }
  176. var timer *time.Timer
  177. if !this.rd.IsZero() {
  178. duration := this.rd.Sub(time.Now())
  179. if duration <= 0 {
  180. return 0, errTimeout
  181. }
  182. timer = time.AfterFunc(duration, this.dataInputCond.Signal)
  183. }
  184. this.dataInputCond.L.Lock()
  185. this.dataInputCond.Wait()
  186. this.dataInputCond.L.Unlock()
  187. if timer != nil {
  188. timer.Stop()
  189. }
  190. if !this.rd.IsZero() && this.rd.Before(time.Now()) {
  191. return 0, errTimeout
  192. }
  193. }
  194. }
  195. // Write implements the Conn Write method.
  196. func (this *Connection) Write(b []byte) (int, error) {
  197. totalWritten := 0
  198. for {
  199. if this == nil || this.State() != StateActive {
  200. return totalWritten, io.ErrClosedPipe
  201. }
  202. nBytes := this.sendingWorker.Push(b[totalWritten:])
  203. if nBytes > 0 {
  204. totalWritten += nBytes
  205. if totalWritten == len(b) {
  206. return totalWritten, nil
  207. }
  208. }
  209. var timer *time.Timer
  210. if !this.wd.IsZero() {
  211. duration := this.wd.Sub(time.Now())
  212. if duration <= 0 {
  213. return totalWritten, errTimeout
  214. }
  215. timer = time.AfterFunc(duration, this.dataOutputCond.Signal)
  216. }
  217. this.dataOutputCond.L.Lock()
  218. this.dataOutputCond.Wait()
  219. this.dataOutputCond.L.Unlock()
  220. if timer != nil {
  221. timer.Stop()
  222. }
  223. if !this.wd.IsZero() && this.wd.Before(time.Now()) {
  224. return totalWritten, errTimeout
  225. }
  226. }
  227. }
  228. func (this *Connection) SetState(state State) {
  229. current := this.Elapsed()
  230. atomic.StoreInt32((*int32)(&this.state), int32(state))
  231. atomic.StoreUint32(&this.stateBeginTime, current)
  232. log.Debug("KCP|Connection: #", this.conv, " entering state ", state, " at ", current)
  233. switch state {
  234. case StateReadyToClose:
  235. this.receivingWorker.CloseRead()
  236. case StatePeerClosed:
  237. this.sendingWorker.CloseWrite()
  238. case StateTerminating:
  239. this.receivingWorker.CloseRead()
  240. this.sendingWorker.CloseWrite()
  241. case StatePeerTerminating:
  242. this.sendingWorker.CloseWrite()
  243. case StateTerminated:
  244. this.receivingWorker.CloseRead()
  245. this.sendingWorker.CloseWrite()
  246. }
  247. }
  248. // Close closes the connection.
  249. func (this *Connection) Close() error {
  250. if this == nil {
  251. return errClosedConnection
  252. }
  253. this.dataInputCond.Broadcast()
  254. this.dataOutputCond.Broadcast()
  255. state := this.State()
  256. if state.Is(StateReadyToClose, StateTerminating, StateTerminated) {
  257. return errClosedConnection
  258. }
  259. log.Info("KCP|Connection: Closing connection to ", this.remote)
  260. if state == StateActive {
  261. this.SetState(StateReadyToClose)
  262. }
  263. if state == StatePeerClosed {
  264. this.SetState(StateTerminating)
  265. }
  266. if state == StatePeerTerminating {
  267. this.SetState(StateTerminated)
  268. }
  269. return nil
  270. }
  271. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  272. func (this *Connection) LocalAddr() net.Addr {
  273. if this == nil {
  274. return nil
  275. }
  276. return this.local
  277. }
  278. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  279. func (this *Connection) RemoteAddr() net.Addr {
  280. if this == nil {
  281. return nil
  282. }
  283. return this.remote
  284. }
  285. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  286. func (this *Connection) SetDeadline(t time.Time) error {
  287. if err := this.SetReadDeadline(t); err != nil {
  288. return err
  289. }
  290. if err := this.SetWriteDeadline(t); err != nil {
  291. return err
  292. }
  293. return nil
  294. }
  295. // SetReadDeadline implements the Conn SetReadDeadline method.
  296. func (this *Connection) SetReadDeadline(t time.Time) error {
  297. if this == nil || this.State() != StateActive {
  298. return errClosedConnection
  299. }
  300. this.rd = t
  301. return nil
  302. }
  303. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  304. func (this *Connection) SetWriteDeadline(t time.Time) error {
  305. if this == nil || this.State() != StateActive {
  306. return errClosedConnection
  307. }
  308. this.wd = t
  309. return nil
  310. }
  311. // kcp update, input loop
  312. func (this *Connection) updateTask() {
  313. for this.State() != StateTerminated {
  314. this.flush()
  315. interval := time.Duration(effectiveConfig.Tti) * time.Millisecond
  316. if this.State() == StateTerminating {
  317. interval = time.Second
  318. }
  319. time.Sleep(interval)
  320. }
  321. this.Terminate()
  322. }
  323. func (this *Connection) FetchInputFrom(conn io.Reader) {
  324. go func() {
  325. payload := alloc.NewLocalBuffer(2048)
  326. defer payload.Release()
  327. for this.State() != StateTerminated {
  328. payload.Reset()
  329. nBytes, err := conn.Read(payload.Value)
  330. if err != nil {
  331. return
  332. }
  333. payload.Slice(0, nBytes)
  334. if this.block.Open(payload) {
  335. this.Input(payload.Value)
  336. }
  337. }
  338. }()
  339. }
  340. func (this *Connection) Reusable() bool {
  341. return false
  342. }
  343. func (this *Connection) SetReusable(b bool) {}
  344. func (this *Connection) Terminate() {
  345. if this == nil || this.writer == nil {
  346. return
  347. }
  348. log.Info("KCP|Connection: Terminating connection to ", this.RemoteAddr())
  349. this.SetState(StateTerminated)
  350. this.dataInputCond.Broadcast()
  351. this.dataOutputCond.Broadcast()
  352. this.writer.Close()
  353. }
  354. func (this *Connection) HandleOption(opt SegmentOption) {
  355. if (opt & SegmentOptionClose) == SegmentOptionClose {
  356. this.OnPeerClosed()
  357. }
  358. }
  359. func (this *Connection) OnPeerClosed() {
  360. state := this.State()
  361. if state == StateReadyToClose {
  362. this.SetState(StateTerminating)
  363. }
  364. if state == StateActive {
  365. this.SetState(StatePeerClosed)
  366. }
  367. }
  368. // Input when you received a low level packet (eg. UDP packet), call it
  369. func (this *Connection) Input(data []byte) int {
  370. current := this.Elapsed()
  371. atomic.StoreUint32(&this.lastIncomingTime, current)
  372. var seg Segment
  373. for {
  374. seg, data = ReadSegment(data)
  375. if seg == nil {
  376. break
  377. }
  378. switch seg := seg.(type) {
  379. case *DataSegment:
  380. this.HandleOption(seg.Option)
  381. this.receivingWorker.ProcessSegment(seg)
  382. this.dataInputCond.Signal()
  383. case *AckSegment:
  384. this.HandleOption(seg.Option)
  385. this.sendingWorker.ProcessSegment(current, seg)
  386. this.dataOutputCond.Signal()
  387. case *CmdOnlySegment:
  388. this.HandleOption(seg.Option)
  389. if seg.Command == CommandTerminate {
  390. state := this.State()
  391. if state == StateActive ||
  392. state == StatePeerClosed {
  393. this.SetState(StatePeerTerminating)
  394. } else if state == StateReadyToClose {
  395. this.SetState(StateTerminating)
  396. } else if state == StateTerminating {
  397. this.SetState(StateTerminated)
  398. }
  399. }
  400. this.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
  401. this.receivingWorker.ProcessSendingNext(seg.SendingNext)
  402. this.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)
  403. seg.Release()
  404. default:
  405. }
  406. }
  407. return 0
  408. }
  409. func (this *Connection) flush() {
  410. current := this.Elapsed()
  411. if this.State() == StateTerminated {
  412. return
  413. }
  414. if this.State() == StateActive && current-atomic.LoadUint32(&this.lastIncomingTime) >= 30000 {
  415. this.Close()
  416. }
  417. if this.State() == StateReadyToClose && this.sendingWorker.IsEmpty() {
  418. this.SetState(StateTerminating)
  419. }
  420. if this.State() == StateTerminating {
  421. log.Debug("KCP|Connection: #", this.conv, " sending terminating cmd.")
  422. seg := NewCmdOnlySegment()
  423. defer seg.Release()
  424. seg.Conv = this.conv
  425. seg.Command = CommandTerminate
  426. this.output.Write(seg)
  427. this.output.Flush()
  428. if current-atomic.LoadUint32(&this.stateBeginTime) > 8000 {
  429. this.SetState(StateTerminated)
  430. }
  431. return
  432. }
  433. if this.State() == StatePeerTerminating && current-atomic.LoadUint32(&this.stateBeginTime) > 4000 {
  434. this.SetState(StateTerminating)
  435. }
  436. if this.State() == StateReadyToClose && current-atomic.LoadUint32(&this.stateBeginTime) > 15000 {
  437. this.SetState(StateTerminating)
  438. }
  439. // flush acknowledges
  440. this.receivingWorker.Flush(current)
  441. this.sendingWorker.Flush(current)
  442. if this.sendingWorker.PingNecessary() || this.receivingWorker.PingNecessary() || current-atomic.LoadUint32(&this.lastPingTime) >= 5000 {
  443. seg := NewCmdOnlySegment()
  444. seg.Conv = this.conv
  445. seg.Command = CommandPing
  446. seg.ReceivinNext = this.receivingWorker.nextNumber
  447. seg.SendingNext = this.sendingWorker.firstUnacknowledged
  448. seg.PeerRTO = this.roundTrip.Timeout()
  449. if this.State() == StateReadyToClose {
  450. seg.Option = SegmentOptionClose
  451. }
  452. this.output.Write(seg)
  453. this.lastPingTime = current
  454. this.sendingWorker.MarkPingNecessary(false)
  455. this.receivingWorker.MarkPingNecessary(false)
  456. seg.Release()
  457. }
  458. // flash remain segments
  459. this.output.Flush()
  460. }
  461. func (this *Connection) State() State {
  462. return State(atomic.LoadInt32((*int32)(&this.state)))
  463. }