| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- package kcp
- import (
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "v2ray.com/core/common"
- "v2ray.com/core/common/buf"
- "v2ray.com/core/common/signal"
- "v2ray.com/core/common/signal/semaphore"
- )
- var (
- ErrIOTimeout = newError("Read/Write timeout")
- ErrClosedListener = newError("Listener closed.")
- ErrClosedConnection = newError("Connection closed.")
- )
- // State of the connection
- type State int32
- // Is returns true if current State is one of the candidates.
- func (s State) Is(states ...State) bool {
- for _, state := range states {
- if s == state {
- return true
- }
- }
- return false
- }
- const (
- StateActive State = 0 // Connection is active
- StateReadyToClose State = 1 // Connection is closed locally
- StatePeerClosed State = 2 // Connection is closed on remote
- StateTerminating State = 3 // Connection is ready to be destroyed locally
- StatePeerTerminating State = 4 // Connection is ready to be destroyed on remote
- StateTerminated State = 5 // Connection is destroyed.
- )
- func nowMillisec() int64 {
- now := time.Now()
- return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
- }
- type RoundTripInfo struct {
- sync.RWMutex
- variation uint32
- srtt uint32
- rto uint32
- minRtt uint32
- updatedTimestamp uint32
- }
- func (info *RoundTripInfo) UpdatePeerRTO(rto uint32, current uint32) {
- info.Lock()
- defer info.Unlock()
- if current-info.updatedTimestamp < 3000 {
- return
- }
- info.updatedTimestamp = current
- info.rto = rto
- }
- func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
- if rtt > 0x7FFFFFFF {
- return
- }
- info.Lock()
- defer info.Unlock()
- // https://tools.ietf.org/html/rfc6298
- if info.srtt == 0 {
- info.srtt = rtt
- info.variation = rtt / 2
- } else {
- delta := rtt - info.srtt
- if info.srtt > rtt {
- delta = info.srtt - rtt
- }
- info.variation = (3*info.variation + delta) / 4
- info.srtt = (7*info.srtt + rtt) / 8
- if info.srtt < info.minRtt {
- info.srtt = info.minRtt
- }
- }
- var rto uint32
- if info.minRtt < 4*info.variation {
- rto = info.srtt + 4*info.variation
- } else {
- rto = info.srtt + info.variation
- }
- if rto > 10000 {
- rto = 10000
- }
- info.rto = rto * 5 / 4
- info.updatedTimestamp = current
- }
- func (info *RoundTripInfo) Timeout() uint32 {
- info.RLock()
- defer info.RUnlock()
- return info.rto
- }
- func (info *RoundTripInfo) SmoothedTime() uint32 {
- info.RLock()
- defer info.RUnlock()
- return info.srtt
- }
- type Updater struct {
- interval int64
- shouldContinue func() bool
- shouldTerminate func() bool
- updateFunc func()
- notifier *semaphore.Instance
- }
- func NewUpdater(interval uint32, shouldContinue func() bool, shouldTerminate func() bool, updateFunc func()) *Updater {
- u := &Updater{
- interval: int64(time.Duration(interval) * time.Millisecond),
- shouldContinue: shouldContinue,
- shouldTerminate: shouldTerminate,
- updateFunc: updateFunc,
- notifier: semaphore.New(1),
- }
- return u
- }
- func (u *Updater) WakeUp() {
- select {
- case <-u.notifier.Wait():
- go u.run()
- default:
- }
- }
- func (u *Updater) run() {
- defer u.notifier.Signal()
- if u.shouldTerminate() {
- return
- }
- ticker := time.NewTicker(u.Interval())
- for u.shouldContinue() {
- u.updateFunc()
- <-ticker.C
- }
- ticker.Stop()
- }
- func (u *Updater) Interval() time.Duration {
- return time.Duration(atomic.LoadInt64(&u.interval))
- }
- func (u *Updater) SetInterval(d time.Duration) {
- atomic.StoreInt64(&u.interval, int64(d))
- }
- type ConnMetadata struct {
- LocalAddr net.Addr
- RemoteAddr net.Addr
- Conversation uint16
- }
- // Connection is a KCP connection over UDP.
- type Connection struct {
- meta ConnMetadata
- closer io.Closer
- rd time.Time
- wd time.Time // write deadline
- since int64
- dataInput *signal.Notifier
- dataOutput *signal.Notifier
- Config *Config
- state State
- stateBeginTime uint32
- lastIncomingTime uint32
- lastPingTime uint32
- mss uint32
- roundTrip *RoundTripInfo
- receivingWorker *ReceivingWorker
- sendingWorker *SendingWorker
- output SegmentWriter
- dataUpdater *Updater
- pingUpdater *Updater
- }
- // NewConnection create a new KCP connection between local and remote.
- func NewConnection(meta ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection {
- newError("#", meta.Conversation, " creating connection to ", meta.RemoteAddr).WriteToLog()
- conn := &Connection{
- meta: meta,
- closer: closer,
- since: nowMillisec(),
- dataInput: signal.NewNotifier(),
- dataOutput: signal.NewNotifier(),
- Config: config,
- output: NewRetryableWriter(NewSegmentWriter(writer)),
- mss: config.GetMTUValue() - uint32(writer.Overhead()) - DataSegmentOverhead,
- roundTrip: &RoundTripInfo{
- rto: 100,
- minRtt: config.GetTTIValue(),
- },
- }
- conn.receivingWorker = NewReceivingWorker(conn)
- conn.sendingWorker = NewSendingWorker(conn)
- isTerminating := func() bool {
- return conn.State().Is(StateTerminating, StateTerminated)
- }
- isTerminated := func() bool {
- return conn.State() == StateTerminated
- }
- conn.dataUpdater = NewUpdater(
- config.GetTTIValue(),
- func() bool {
- return !isTerminating() && (conn.sendingWorker.UpdateNecessary() || conn.receivingWorker.UpdateNecessary())
- },
- isTerminating,
- conn.updateTask)
- conn.pingUpdater = NewUpdater(
- 5000, // 5 seconds
- func() bool { return !isTerminated() },
- isTerminated,
- conn.updateTask)
- conn.pingUpdater.WakeUp()
- return conn
- }
- func (c *Connection) Elapsed() uint32 {
- return uint32(nowMillisec() - c.since)
- }
- // ReadMultiBuffer implements buf.Reader.
- func (c *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) {
- if c == nil {
- return nil, io.EOF
- }
- for {
- if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) {
- return nil, io.EOF
- }
- mb := c.receivingWorker.ReadMultiBuffer()
- if !mb.IsEmpty() {
- c.dataUpdater.WakeUp()
- return mb, nil
- }
- if c.State() == StatePeerTerminating {
- return nil, io.EOF
- }
- if err := c.waitForDataInput(); err != nil {
- return nil, err
- }
- }
- }
- func (c *Connection) waitForDataInput() error {
- if c.State() == StatePeerTerminating {
- return io.EOF
- }
- duration := time.Minute
- if !c.rd.IsZero() {
- duration = time.Until(c.rd)
- if duration < 0 {
- return ErrIOTimeout
- }
- }
- select {
- case <-c.dataInput.Wait():
- case <-time.After(duration):
- if !c.rd.IsZero() && c.rd.Before(time.Now()) {
- return ErrIOTimeout
- }
- }
- return nil
- }
- // Read implements the Conn Read method.
- func (c *Connection) Read(b []byte) (int, error) {
- if c == nil {
- return 0, io.EOF
- }
- for {
- if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) {
- return 0, io.EOF
- }
- nBytes := c.receivingWorker.Read(b)
- if nBytes > 0 {
- c.dataUpdater.WakeUp()
- return nBytes, nil
- }
- if err := c.waitForDataInput(); err != nil {
- return 0, err
- }
- }
- }
- func (c *Connection) waitForDataOutput() error {
- duration := time.Minute
- if !c.wd.IsZero() {
- duration = time.Until(c.wd)
- if duration < 0 {
- return ErrIOTimeout
- }
- }
- select {
- case <-c.dataOutput.Wait():
- case <-time.After(duration):
- if !c.wd.IsZero() && c.wd.Before(time.Now()) {
- return ErrIOTimeout
- }
- }
- return nil
- }
- // Write implements io.Writer.
- func (c *Connection) Write(b []byte) (int, error) {
- // This involves multiple copies of the buffer. But we don't expect this method to be used often.
- // Only wrapped connections such as TLS and WebSocket will call into this.
- // TODO: improve effeciency.
- var mb buf.MultiBuffer
- common.Must2(mb.Write(b))
- if err := c.WriteMultiBuffer(mb); err != nil {
- return 0, err
- }
- return len(b), nil
- }
- // WriteMultiBuffer implements buf.Writer.
- func (c *Connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
- defer mb.Release()
- updatePending := false
- defer func() {
- if updatePending {
- c.dataUpdater.WakeUp()
- }
- }()
- for {
- for {
- if c == nil || c.State() != StateActive {
- return io.ErrClosedPipe
- }
- if !c.sendingWorker.Push(&mb) {
- break
- }
- updatePending = true
- if mb.IsEmpty() {
- return nil
- }
- }
- if updatePending {
- c.dataUpdater.WakeUp()
- updatePending = false
- }
- if err := c.waitForDataOutput(); err != nil {
- return err
- }
- }
- }
- func (c *Connection) SetState(state State) {
- current := c.Elapsed()
- atomic.StoreInt32((*int32)(&c.state), int32(state))
- atomic.StoreUint32(&c.stateBeginTime, current)
- newError("#", c.meta.Conversation, " entering state ", state, " at ", current).AtDebug().WriteToLog()
- switch state {
- case StateReadyToClose:
- c.receivingWorker.CloseRead()
- case StatePeerClosed:
- c.sendingWorker.CloseWrite()
- case StateTerminating:
- c.receivingWorker.CloseRead()
- c.sendingWorker.CloseWrite()
- c.pingUpdater.SetInterval(time.Second)
- case StatePeerTerminating:
- c.sendingWorker.CloseWrite()
- c.pingUpdater.SetInterval(time.Second)
- case StateTerminated:
- c.receivingWorker.CloseRead()
- c.sendingWorker.CloseWrite()
- c.pingUpdater.SetInterval(time.Second)
- c.dataUpdater.WakeUp()
- c.pingUpdater.WakeUp()
- go c.Terminate()
- }
- }
- // Close closes the connection.
- func (c *Connection) Close() error {
- if c == nil {
- return ErrClosedConnection
- }
- c.dataInput.Signal()
- c.dataOutput.Signal()
- switch c.State() {
- case StateReadyToClose, StateTerminating, StateTerminated:
- return ErrClosedConnection
- case StateActive:
- c.SetState(StateReadyToClose)
- case StatePeerClosed:
- c.SetState(StateTerminating)
- case StatePeerTerminating:
- c.SetState(StateTerminated)
- }
- newError("#", c.meta.Conversation, " closing connection to ", c.meta.RemoteAddr).WriteToLog()
- return nil
- }
- // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
- func (c *Connection) LocalAddr() net.Addr {
- if c == nil {
- return nil
- }
- return c.meta.LocalAddr
- }
- // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
- func (c *Connection) RemoteAddr() net.Addr {
- if c == nil {
- return nil
- }
- return c.meta.RemoteAddr
- }
- // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
- func (c *Connection) SetDeadline(t time.Time) error {
- if err := c.SetReadDeadline(t); err != nil {
- return err
- }
- return c.SetWriteDeadline(t)
- }
- // SetReadDeadline implements the Conn SetReadDeadline method.
- func (c *Connection) SetReadDeadline(t time.Time) error {
- if c == nil || c.State() != StateActive {
- return ErrClosedConnection
- }
- c.rd = t
- return nil
- }
- // SetWriteDeadline implements the Conn SetWriteDeadline method.
- func (c *Connection) SetWriteDeadline(t time.Time) error {
- if c == nil || c.State() != StateActive {
- return ErrClosedConnection
- }
- c.wd = t
- return nil
- }
- // kcp update, input loop
- func (c *Connection) updateTask() {
- c.flush()
- }
- func (c *Connection) Terminate() {
- if c == nil {
- return
- }
- newError("#", c.meta.Conversation, " terminating connection to ", c.RemoteAddr()).WriteToLog()
- //v.SetState(StateTerminated)
- c.dataInput.Signal()
- c.dataOutput.Signal()
- c.closer.Close()
- c.sendingWorker.Release()
- c.receivingWorker.Release()
- }
- func (c *Connection) HandleOption(opt SegmentOption) {
- if (opt & SegmentOptionClose) == SegmentOptionClose {
- c.OnPeerClosed()
- }
- }
- func (c *Connection) OnPeerClosed() {
- switch c.State() {
- case StateReadyToClose:
- c.SetState(StateTerminating)
- case StateActive:
- c.SetState(StatePeerClosed)
- }
- }
- // Input when you received a low level packet (eg. UDP packet), call it
- func (c *Connection) Input(segments []Segment) {
- current := c.Elapsed()
- atomic.StoreUint32(&c.lastIncomingTime, current)
- for _, seg := range segments {
- if seg.Conversation() != c.meta.Conversation {
- break
- }
- switch seg := seg.(type) {
- case *DataSegment:
- c.HandleOption(seg.Option)
- c.receivingWorker.ProcessSegment(seg)
- if c.receivingWorker.IsDataAvailable() {
- c.dataInput.Signal()
- }
- c.dataUpdater.WakeUp()
- case *AckSegment:
- c.HandleOption(seg.Option)
- c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
- c.dataOutput.Signal()
- c.dataUpdater.WakeUp()
- case *CmdOnlySegment:
- c.HandleOption(seg.Option)
- if seg.Command() == CommandTerminate {
- switch c.State() {
- case StateActive, StatePeerClosed:
- c.SetState(StatePeerTerminating)
- case StateReadyToClose:
- c.SetState(StateTerminating)
- case StateTerminating:
- c.SetState(StateTerminated)
- }
- }
- if seg.Option == SegmentOptionClose || seg.Command() == CommandTerminate {
- c.dataInput.Signal()
- c.dataOutput.Signal()
- }
- c.sendingWorker.ProcessReceivingNext(seg.ReceivingNext)
- c.receivingWorker.ProcessSendingNext(seg.SendingNext)
- c.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)
- seg.Release()
- default:
- }
- }
- }
- func (c *Connection) flush() {
- current := c.Elapsed()
- if c.State() == StateTerminated {
- return
- }
- if c.State() == StateActive && current-atomic.LoadUint32(&c.lastIncomingTime) >= 30000 {
- c.Close()
- }
- if c.State() == StateReadyToClose && c.sendingWorker.IsEmpty() {
- c.SetState(StateTerminating)
- }
- if c.State() == StateTerminating {
- newError("#", c.meta.Conversation, " sending terminating cmd.").AtDebug().WriteToLog()
- c.Ping(current, CommandTerminate)
- if current-atomic.LoadUint32(&c.stateBeginTime) > 8000 {
- c.SetState(StateTerminated)
- }
- return
- }
- if c.State() == StatePeerTerminating && current-atomic.LoadUint32(&c.stateBeginTime) > 4000 {
- c.SetState(StateTerminating)
- }
- if c.State() == StateReadyToClose && current-atomic.LoadUint32(&c.stateBeginTime) > 15000 {
- c.SetState(StateTerminating)
- }
- // flush acknowledges
- c.receivingWorker.Flush(current)
- c.sendingWorker.Flush(current)
- if current-atomic.LoadUint32(&c.lastPingTime) >= 3000 {
- c.Ping(current, CommandPing)
- }
- }
- func (c *Connection) State() State {
- return State(atomic.LoadInt32((*int32)(&c.state)))
- }
- func (c *Connection) Ping(current uint32, cmd Command) {
- seg := NewCmdOnlySegment()
- seg.Conv = c.meta.Conversation
- seg.Cmd = cmd
- seg.ReceivingNext = c.receivingWorker.NextNumber()
- seg.SendingNext = c.sendingWorker.FirstUnacknowledged()
- seg.PeerRTO = c.roundTrip.Timeout()
- if c.State() == StateReadyToClose {
- seg.Option = SegmentOptionClose
- }
- c.output.Write(seg)
- atomic.StoreUint32(&c.lastPingTime, current)
- seg.Release()
- }
|