connection.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package kcp
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. "github.com/v2ray/v2ray-core/common/log"
  10. )
  11. var (
  12. errTimeout = errors.New("i/o timeout")
  13. errBrokenPipe = errors.New("broken pipe")
  14. errClosedListener = errors.New("Listener closed.")
  15. errClosedConnection = errors.New("Connection closed.")
  16. )
  17. type State int
  18. const (
  19. StateActive State = 0
  20. StateReadyToClose State = 1
  21. StatePeerClosed State = 2
  22. StateTerminating State = 3
  23. StateTerminated State = 4
  24. )
  25. const (
  26. headerSize uint32 = 2
  27. )
  28. func nowMillisec() int64 {
  29. now := time.Now()
  30. return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
  31. }
  32. // Connection is a KCP connection over UDP.
  33. type Connection struct {
  34. sync.RWMutex
  35. block Authenticator
  36. local, remote net.Addr
  37. wd time.Time // write deadline
  38. writer io.WriteCloser
  39. since int64
  40. conv uint16
  41. state State
  42. stateBeginTime uint32
  43. lastIncomingTime uint32
  44. lastPayloadTime uint32
  45. sendingUpdated bool
  46. lastPingTime uint32
  47. mss uint32
  48. rx_rttvar, rx_srtt, rx_rto uint32
  49. interval uint32
  50. receivingWorker *ReceivingWorker
  51. sendingWorker *SendingWorker
  52. fastresend uint32
  53. congestionControl bool
  54. output *BufferedSegmentWriter
  55. }
  56. // NewConnection create a new KCP connection between local and remote.
  57. func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *Connection {
  58. log.Debug("KCP|Connection: creating connection ", conv)
  59. conn := new(Connection)
  60. conn.local = local
  61. conn.remote = remote
  62. conn.block = block
  63. conn.writer = writerCloser
  64. conn.since = nowMillisec()
  65. authWriter := &AuthenticationWriter{
  66. Authenticator: block,
  67. Writer: writerCloser,
  68. }
  69. conn.conv = conv
  70. conn.output = NewSegmentWriter(authWriter)
  71. conn.mss = authWriter.Mtu() - DataSegmentOverhead
  72. conn.rx_rto = 100
  73. conn.interval = effectiveConfig.Tti
  74. conn.receivingWorker = NewReceivingWorker(conn)
  75. conn.fastresend = 2
  76. conn.congestionControl = effectiveConfig.Congestion
  77. conn.sendingWorker = NewSendingWorker(conn)
  78. go conn.updateTask()
  79. return conn
  80. }
  81. func (this *Connection) Elapsed() uint32 {
  82. return uint32(nowMillisec() - this.since)
  83. }
  84. // Read implements the Conn Read method.
  85. func (this *Connection) Read(b []byte) (int, error) {
  86. if this == nil {
  87. return 0, io.EOF
  88. }
  89. state := this.State()
  90. if state == StateTerminating || state == StateTerminated {
  91. return 0, io.EOF
  92. }
  93. return this.receivingWorker.Read(b)
  94. }
  95. // Write implements the Conn Write method.
  96. func (this *Connection) Write(b []byte) (int, error) {
  97. if this == nil || this.State() != StateActive {
  98. return 0, io.ErrClosedPipe
  99. }
  100. totalWritten := 0
  101. for {
  102. if this == nil || this.State() != StateActive {
  103. return totalWritten, io.ErrClosedPipe
  104. }
  105. nBytes := this.sendingWorker.Push(b[totalWritten:])
  106. if nBytes > 0 {
  107. totalWritten += nBytes
  108. if totalWritten == len(b) {
  109. return totalWritten, nil
  110. }
  111. }
  112. if !this.wd.IsZero() && this.wd.Before(time.Now()) {
  113. return totalWritten, errTimeout
  114. }
  115. // Sending windows is 1024 for the moment. This amount is not gonna sent in 1 sec.
  116. time.Sleep(time.Second)
  117. }
  118. }
  119. func (this *Connection) SetState(state State) {
  120. this.Lock()
  121. this.state = state
  122. this.stateBeginTime = this.Elapsed()
  123. this.Unlock()
  124. switch state {
  125. case StateReadyToClose:
  126. this.receivingWorker.CloseRead()
  127. case StatePeerClosed:
  128. this.sendingWorker.CloseWrite()
  129. case StateTerminating:
  130. this.receivingWorker.CloseRead()
  131. this.sendingWorker.CloseWrite()
  132. case StateTerminated:
  133. this.receivingWorker.CloseRead()
  134. this.sendingWorker.CloseWrite()
  135. }
  136. }
  137. // Close closes the connection.
  138. func (this *Connection) Close() error {
  139. if this == nil {
  140. return errClosedConnection
  141. }
  142. state := this.State()
  143. if state == StateReadyToClose ||
  144. state == StateTerminating ||
  145. state == StateTerminated {
  146. return errClosedConnection
  147. }
  148. log.Debug("KCP|Connection: Closing connection to ", this.remote)
  149. if state == StateActive {
  150. this.SetState(StateReadyToClose)
  151. }
  152. if state == StatePeerClosed {
  153. this.SetState(StateTerminating)
  154. }
  155. return nil
  156. }
  157. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  158. func (this *Connection) LocalAddr() net.Addr {
  159. if this == nil {
  160. return nil
  161. }
  162. return this.local
  163. }
  164. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  165. func (this *Connection) RemoteAddr() net.Addr {
  166. if this == nil {
  167. return nil
  168. }
  169. return this.remote
  170. }
  171. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  172. func (this *Connection) SetDeadline(t time.Time) error {
  173. if err := this.SetReadDeadline(t); err != nil {
  174. return err
  175. }
  176. if err := this.SetWriteDeadline(t); err != nil {
  177. return err
  178. }
  179. return nil
  180. }
  181. // SetReadDeadline implements the Conn SetReadDeadline method.
  182. func (this *Connection) SetReadDeadline(t time.Time) error {
  183. if this == nil || this.State() != StateActive {
  184. return errClosedConnection
  185. }
  186. this.receivingWorker.SetReadDeadline(t)
  187. return nil
  188. }
  189. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  190. func (this *Connection) SetWriteDeadline(t time.Time) error {
  191. if this == nil || this.State() != StateActive {
  192. return errClosedConnection
  193. }
  194. this.wd = t
  195. return nil
  196. }
  197. // kcp update, input loop
  198. func (this *Connection) updateTask() {
  199. for this.State() != StateTerminated {
  200. this.flush()
  201. interval := time.Duration(effectiveConfig.Tti) * time.Millisecond
  202. if this.State() == StateTerminating {
  203. interval = time.Second
  204. }
  205. time.Sleep(interval)
  206. }
  207. this.Terminate()
  208. }
  209. func (this *Connection) FetchInputFrom(conn net.Conn) {
  210. go func() {
  211. for {
  212. payload := alloc.NewBuffer()
  213. nBytes, err := conn.Read(payload.Value)
  214. if err != nil {
  215. payload.Release()
  216. return
  217. }
  218. payload.Slice(0, nBytes)
  219. if this.block.Open(payload) {
  220. this.Input(payload.Value)
  221. } else {
  222. log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
  223. }
  224. payload.Release()
  225. }
  226. }()
  227. }
  228. func (this *Connection) Reusable() bool {
  229. return false
  230. }
  231. func (this *Connection) SetReusable(b bool) {}
  232. func (this *Connection) Terminate() {
  233. if this == nil || this.writer == nil {
  234. return
  235. }
  236. log.Info("Terminating connection to ", this.RemoteAddr())
  237. this.writer.Close()
  238. }
  239. func (this *Connection) HandleOption(opt SegmentOption) {
  240. if (opt & SegmentOptionClose) == SegmentOptionClose {
  241. this.OnPeerClosed()
  242. }
  243. }
  244. func (this *Connection) OnPeerClosed() {
  245. state := this.State()
  246. if state == StateReadyToClose {
  247. this.SetState(StateTerminating)
  248. }
  249. if state == StateActive {
  250. this.SetState(StatePeerClosed)
  251. }
  252. }
  253. // https://tools.ietf.org/html/rfc6298
  254. func (this *Connection) update_ack(rtt int32) {
  255. this.Lock()
  256. defer this.Unlock()
  257. if this.rx_srtt == 0 {
  258. this.rx_srtt = uint32(rtt)
  259. this.rx_rttvar = uint32(rtt) / 2
  260. } else {
  261. delta := rtt - int32(this.rx_srtt)
  262. if delta < 0 {
  263. delta = -delta
  264. }
  265. this.rx_rttvar = (3*this.rx_rttvar + uint32(delta)) / 4
  266. this.rx_srtt = (7*this.rx_srtt + uint32(rtt)) / 8
  267. if this.rx_srtt < this.interval {
  268. this.rx_srtt = this.interval
  269. }
  270. }
  271. var rto uint32
  272. if this.interval < 4*this.rx_rttvar {
  273. rto = this.rx_srtt + 4*this.rx_rttvar
  274. } else {
  275. rto = this.rx_srtt + this.interval
  276. }
  277. if rto > 10000 {
  278. rto = 10000
  279. }
  280. this.rx_rto = rto * 3 / 2
  281. }
  282. // Input when you received a low level packet (eg. UDP packet), call it
  283. func (kcp *Connection) Input(data []byte) int {
  284. current := kcp.Elapsed()
  285. kcp.lastIncomingTime = current
  286. var seg Segment
  287. for {
  288. seg, data = ReadSegment(data)
  289. if seg == nil {
  290. break
  291. }
  292. switch seg := seg.(type) {
  293. case *DataSegment:
  294. kcp.HandleOption(seg.Opt)
  295. kcp.receivingWorker.ProcessSegment(seg)
  296. kcp.lastPayloadTime = current
  297. case *AckSegment:
  298. kcp.HandleOption(seg.Opt)
  299. kcp.sendingWorker.ProcessSegment(current, seg)
  300. kcp.lastPayloadTime = current
  301. case *CmdOnlySegment:
  302. kcp.HandleOption(seg.Opt)
  303. if seg.Cmd == SegmentCommandTerminated {
  304. if kcp.state == StateActive ||
  305. kcp.state == StateReadyToClose ||
  306. kcp.state == StatePeerClosed {
  307. kcp.SetState(StateTerminating)
  308. } else if kcp.state == StateTerminating {
  309. kcp.SetState(StateTerminated)
  310. }
  311. }
  312. kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
  313. kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
  314. default:
  315. }
  316. }
  317. return 0
  318. }
  319. func (this *Connection) flush() {
  320. current := this.Elapsed()
  321. state := this.State()
  322. if state == StateTerminated {
  323. return
  324. }
  325. if state == StateActive && current-this.lastPayloadTime >= 30000 {
  326. this.Close()
  327. }
  328. if state == StateTerminating {
  329. this.output.Write(&CmdOnlySegment{
  330. Conv: this.conv,
  331. Cmd: SegmentCommandTerminated,
  332. })
  333. this.output.Flush()
  334. if current-this.stateBeginTime > 8000 {
  335. this.SetState(StateTerminated)
  336. }
  337. return
  338. }
  339. if state == StateReadyToClose && current-this.stateBeginTime > 15000 {
  340. this.SetState(StateTerminating)
  341. }
  342. // flush acknowledges
  343. this.receivingWorker.Flush(current)
  344. this.sendingWorker.Flush(current)
  345. if this.sendingWorker.PingNecessary() || this.receivingWorker.PingNecessary() || current-this.lastPingTime >= 5000 {
  346. seg := NewCmdOnlySegment()
  347. seg.Conv = this.conv
  348. seg.Cmd = SegmentCommandPing
  349. seg.ReceivinNext = this.receivingWorker.nextNumber
  350. seg.SendingNext = this.sendingWorker.firstUnacknowledged
  351. if state == StateReadyToClose {
  352. seg.Opt = SegmentOptionClose
  353. }
  354. this.output.Write(seg)
  355. this.lastPingTime = current
  356. this.sendingUpdated = false
  357. seg.Release()
  358. }
  359. // flash remain segments
  360. this.output.Flush()
  361. }
  362. func (this *Connection) State() State {
  363. this.RLock()
  364. defer this.RUnlock()
  365. return this.state
  366. }