| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649 | package kcpimport (	"io"	"net"	"runtime"	"sync"	"sync/atomic"	"time"	"v2ray.com/core/common"	"v2ray.com/core/common/buf"	"v2ray.com/core/common/signal"	"v2ray.com/core/common/signal/semaphore")var (	ErrIOTimeout        = newError("Read/Write timeout")	ErrClosedListener   = newError("Listener closed.")	ErrClosedConnection = newError("Connection closed."))// State of the connectiontype State int32// Is returns true if current State is one of the candidates.func (s State) Is(states ...State) bool {	for _, state := range states {		if s == state {			return true		}	}	return false}const (	StateActive          State = 0 // Connection is active	StateReadyToClose    State = 1 // Connection is closed locally	StatePeerClosed      State = 2 // Connection is closed on remote	StateTerminating     State = 3 // Connection is ready to be destroyed locally	StatePeerTerminating State = 4 // Connection is ready to be destroyed on remote	StateTerminated      State = 5 // Connection is destroyed.)func nowMillisec() int64 {	now := time.Now()	return now.Unix()*1000 + int64(now.Nanosecond()/1000000)}type RoundTripInfo struct {	sync.RWMutex	variation        uint32	srtt             uint32	rto              uint32	minRtt           uint32	updatedTimestamp uint32}func (info *RoundTripInfo) UpdatePeerRTO(rto uint32, current uint32) {	info.Lock()	defer info.Unlock()	if current-info.updatedTimestamp < 3000 {		return	}	info.updatedTimestamp = current	info.rto = rto}func (info *RoundTripInfo) Update(rtt uint32, current uint32) {	if rtt > 0x7FFFFFFF {		return	}	info.Lock()	defer info.Unlock()	// https://tools.ietf.org/html/rfc6298	if info.srtt == 0 {		info.srtt = rtt		info.variation = rtt / 2	} else {		delta := rtt - info.srtt		if info.srtt > rtt {			delta = info.srtt - rtt		}		info.variation = (3*info.variation + delta) / 4		info.srtt = (7*info.srtt + rtt) / 8		if info.srtt < info.minRtt {			info.srtt = info.minRtt		}	}	var rto uint32	if info.minRtt < 4*info.variation {		rto = info.srtt + 4*info.variation	} else {		rto = info.srtt + info.variation	}	if rto > 10000 {		rto = 10000	}	info.rto = rto * 5 / 4	info.updatedTimestamp = current}func (info *RoundTripInfo) Timeout() uint32 {	info.RLock()	defer info.RUnlock()	return info.rto}func (info *RoundTripInfo) SmoothedTime() uint32 {	info.RLock()	defer info.RUnlock()	return info.srtt}type Updater struct {	interval        int64	shouldContinue  func() bool	shouldTerminate func() bool	updateFunc      func()	notifier        *semaphore.Instance}func NewUpdater(interval uint32, shouldContinue func() bool, shouldTerminate func() bool, updateFunc func()) *Updater {	u := &Updater{		interval:        int64(time.Duration(interval) * time.Millisecond),		shouldContinue:  shouldContinue,		shouldTerminate: shouldTerminate,		updateFunc:      updateFunc,		notifier:        semaphore.New(1),	}	return u}func (u *Updater) WakeUp() {	select {	case <-u.notifier.Wait():		go u.run()	default:	}}func (u *Updater) run() {	defer u.notifier.Signal()	if u.shouldTerminate() {		return	}	ticker := time.NewTicker(u.Interval())	for u.shouldContinue() {		u.updateFunc()		<-ticker.C	}	ticker.Stop()}func (u *Updater) Interval() time.Duration {	return time.Duration(atomic.LoadInt64(&u.interval))}func (u *Updater) SetInterval(d time.Duration) {	atomic.StoreInt64(&u.interval, int64(d))}type ConnMetadata struct {	LocalAddr    net.Addr	RemoteAddr   net.Addr	Conversation uint16}// Connection is a KCP connection over UDP.type Connection struct {	meta       ConnMetadata	closer     io.Closer	rd         time.Time	wd         time.Time // write deadline	since      int64	dataInput  *signal.Notifier	dataOutput *signal.Notifier	Config     *Config	state            State	stateBeginTime   uint32	lastIncomingTime uint32	lastPingTime     uint32	mss       uint32	roundTrip *RoundTripInfo	receivingWorker *ReceivingWorker	sendingWorker   *SendingWorker	output SegmentWriter	dataUpdater *Updater	pingUpdater *Updater}// NewConnection create a new KCP connection between local and remote.func NewConnection(meta ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection {	newError("#", meta.Conversation, " creating connection to ", meta.RemoteAddr).WriteToLog()	conn := &Connection{		meta:       meta,		closer:     closer,		since:      nowMillisec(),		dataInput:  signal.NewNotifier(),		dataOutput: signal.NewNotifier(),		Config:     config,		output:     NewRetryableWriter(NewSegmentWriter(writer)),		mss:        config.GetMTUValue() - uint32(writer.Overhead()) - DataSegmentOverhead,		roundTrip: &RoundTripInfo{			rto:    100,			minRtt: config.GetTTIValue(),		},	}	conn.receivingWorker = NewReceivingWorker(conn)	conn.sendingWorker = NewSendingWorker(conn)	isTerminating := func() bool {		return conn.State().Is(StateTerminating, StateTerminated)	}	isTerminated := func() bool {		return conn.State() == StateTerminated	}	conn.dataUpdater = NewUpdater(		config.GetTTIValue(),		func() bool {			return !isTerminating() && (conn.sendingWorker.UpdateNecessary() || conn.receivingWorker.UpdateNecessary())		},		isTerminating,		conn.updateTask)	conn.pingUpdater = NewUpdater(		5000, // 5 seconds		func() bool { return !isTerminated() },		isTerminated,		conn.updateTask)	conn.pingUpdater.WakeUp()	return conn}func (c *Connection) Elapsed() uint32 {	return uint32(nowMillisec() - c.since)}// ReadMultiBuffer implements buf.Reader.func (c *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) {	if c == nil {		return nil, io.EOF	}	for {		if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) {			return nil, io.EOF		}		mb := c.receivingWorker.ReadMultiBuffer()		if !mb.IsEmpty() {			c.dataUpdater.WakeUp()			return mb, nil		}		if c.State() == StatePeerTerminating {			return nil, io.EOF		}		if err := c.waitForDataInput(); err != nil {			return nil, err		}	}}func (c *Connection) waitForDataInput() error {	for i := 0; i < 16; i++ {		select {		case <-c.dataInput.Wait():			return nil		default:			runtime.Gosched()		}	}	duration := time.Second * 16	if !c.rd.IsZero() {		duration = time.Until(c.rd)		if duration < 0 {			return ErrIOTimeout		}	}	timeout := time.NewTimer(duration)	defer timeout.Stop()	select {	case <-c.dataInput.Wait():	case <-timeout.C:		if !c.rd.IsZero() && c.rd.Before(time.Now()) {			return ErrIOTimeout		}	}	return nil}// Read implements the Conn Read method.func (c *Connection) Read(b []byte) (int, error) {	if c == nil {		return 0, io.EOF	}	for {		if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) {			return 0, io.EOF		}		nBytes := c.receivingWorker.Read(b)		if nBytes > 0 {			c.dataUpdater.WakeUp()			return nBytes, nil		}		if err := c.waitForDataInput(); err != nil {			return 0, err		}	}}func (c *Connection) waitForDataOutput() error {	for i := 0; i < 16; i++ {		select {		case <-c.dataOutput.Wait():			return nil		default:			runtime.Gosched()		}	}	duration := time.Second * 16	if !c.wd.IsZero() {		duration = time.Until(c.wd)		if duration < 0 {			return ErrIOTimeout		}	}	timeout := time.NewTimer(duration)	defer timeout.Stop()	select {	case <-c.dataOutput.Wait():	case <-timeout.C:		if !c.wd.IsZero() && c.wd.Before(time.Now()) {			return ErrIOTimeout		}	}	return nil}// Write implements io.Writer.func (c *Connection) Write(b []byte) (int, error) {	// This involves multiple copies of the buffer. But we don't expect this method to be used often.	// Only wrapped connections such as TLS and WebSocket will call into this.	// TODO: improve effeciency.	var mb buf.MultiBuffer	common.Must2(mb.Write(b))	if err := c.WriteMultiBuffer(mb); err != nil {		return 0, err	}	return len(b), nil}// WriteMultiBuffer implements buf.Writer.func (c *Connection) WriteMultiBuffer(mb buf.MultiBuffer) error {	defer mb.Release()	updatePending := false	defer func() {		if updatePending {			c.dataUpdater.WakeUp()		}	}()	for {		for {			if c == nil || c.State() != StateActive {				return io.ErrClosedPipe			}			if !c.sendingWorker.Push(&mb) {				break			}			updatePending = true			if mb.IsEmpty() {				return nil			}		}		if updatePending {			c.dataUpdater.WakeUp()			updatePending = false		}		if err := c.waitForDataOutput(); err != nil {			return err		}	}}func (c *Connection) SetState(state State) {	current := c.Elapsed()	atomic.StoreInt32((*int32)(&c.state), int32(state))	atomic.StoreUint32(&c.stateBeginTime, current)	newError("#", c.meta.Conversation, " entering state ", state, " at ", current).AtDebug().WriteToLog()	switch state {	case StateReadyToClose:		c.receivingWorker.CloseRead()	case StatePeerClosed:		c.sendingWorker.CloseWrite()	case StateTerminating:		c.receivingWorker.CloseRead()		c.sendingWorker.CloseWrite()		c.pingUpdater.SetInterval(time.Second)	case StatePeerTerminating:		c.sendingWorker.CloseWrite()		c.pingUpdater.SetInterval(time.Second)	case StateTerminated:		c.receivingWorker.CloseRead()		c.sendingWorker.CloseWrite()		c.pingUpdater.SetInterval(time.Second)		c.dataUpdater.WakeUp()		c.pingUpdater.WakeUp()		go c.Terminate()	}}// Close closes the connection.func (c *Connection) Close() error {	if c == nil {		return ErrClosedConnection	}	c.dataInput.Signal()	c.dataOutput.Signal()	switch c.State() {	case StateReadyToClose, StateTerminating, StateTerminated:		return ErrClosedConnection	case StateActive:		c.SetState(StateReadyToClose)	case StatePeerClosed:		c.SetState(StateTerminating)	case StatePeerTerminating:		c.SetState(StateTerminated)	}	newError("#", c.meta.Conversation, " closing connection to ", c.meta.RemoteAddr).WriteToLog()	return nil}// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.func (c *Connection) LocalAddr() net.Addr {	if c == nil {		return nil	}	return c.meta.LocalAddr}// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.func (c *Connection) RemoteAddr() net.Addr {	if c == nil {		return nil	}	return c.meta.RemoteAddr}// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.func (c *Connection) SetDeadline(t time.Time) error {	if err := c.SetReadDeadline(t); err != nil {		return err	}	return c.SetWriteDeadline(t)}// SetReadDeadline implements the Conn SetReadDeadline method.func (c *Connection) SetReadDeadline(t time.Time) error {	if c == nil || c.State() != StateActive {		return ErrClosedConnection	}	c.rd = t	return nil}// SetWriteDeadline implements the Conn SetWriteDeadline method.func (c *Connection) SetWriteDeadline(t time.Time) error {	if c == nil || c.State() != StateActive {		return ErrClosedConnection	}	c.wd = t	return nil}// kcp update, input loopfunc (c *Connection) updateTask() {	c.flush()}func (c *Connection) Terminate() {	if c == nil {		return	}	newError("#", c.meta.Conversation, " terminating connection to ", c.RemoteAddr()).WriteToLog()	//v.SetState(StateTerminated)	c.dataInput.Signal()	c.dataOutput.Signal()	c.closer.Close()	c.sendingWorker.Release()	c.receivingWorker.Release()}func (c *Connection) HandleOption(opt SegmentOption) {	if (opt & SegmentOptionClose) == SegmentOptionClose {		c.OnPeerClosed()	}}func (c *Connection) OnPeerClosed() {	switch c.State() {	case StateReadyToClose:		c.SetState(StateTerminating)	case StateActive:		c.SetState(StatePeerClosed)	}}// Input when you received a low level packet (eg. UDP packet), call itfunc (c *Connection) Input(segments []Segment) {	current := c.Elapsed()	atomic.StoreUint32(&c.lastIncomingTime, current)	for _, seg := range segments {		if seg.Conversation() != c.meta.Conversation {			break		}		switch seg := seg.(type) {		case *DataSegment:			c.HandleOption(seg.Option)			c.receivingWorker.ProcessSegment(seg)			if c.receivingWorker.IsDataAvailable() {				c.dataInput.Signal()			}			c.dataUpdater.WakeUp()		case *AckSegment:			c.HandleOption(seg.Option)			c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())			c.dataOutput.Signal()			c.dataUpdater.WakeUp()		case *CmdOnlySegment:			c.HandleOption(seg.Option)			if seg.Command() == CommandTerminate {				switch c.State() {				case StateActive, StatePeerClosed:					c.SetState(StatePeerTerminating)				case StateReadyToClose:					c.SetState(StateTerminating)				case StateTerminating:					c.SetState(StateTerminated)				}			}			if seg.Option == SegmentOptionClose || seg.Command() == CommandTerminate {				c.dataInput.Signal()				c.dataOutput.Signal()			}			c.sendingWorker.ProcessReceivingNext(seg.ReceivingNext)			c.receivingWorker.ProcessSendingNext(seg.SendingNext)			c.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)			seg.Release()		default:		}	}}func (c *Connection) flush() {	current := c.Elapsed()	if c.State() == StateTerminated {		return	}	if c.State() == StateActive && current-atomic.LoadUint32(&c.lastIncomingTime) >= 30000 {		c.Close()	}	if c.State() == StateReadyToClose && c.sendingWorker.IsEmpty() {		c.SetState(StateTerminating)	}	if c.State() == StateTerminating {		newError("#", c.meta.Conversation, " sending terminating cmd.").AtDebug().WriteToLog()		c.Ping(current, CommandTerminate)		if current-atomic.LoadUint32(&c.stateBeginTime) > 8000 {			c.SetState(StateTerminated)		}		return	}	if c.State() == StatePeerTerminating && current-atomic.LoadUint32(&c.stateBeginTime) > 4000 {		c.SetState(StateTerminating)	}	if c.State() == StateReadyToClose && current-atomic.LoadUint32(&c.stateBeginTime) > 15000 {		c.SetState(StateTerminating)	}	// flush acknowledges	c.receivingWorker.Flush(current)	c.sendingWorker.Flush(current)	if current-atomic.LoadUint32(&c.lastPingTime) >= 3000 {		c.Ping(current, CommandPing)	}}func (c *Connection) State() State {	return State(atomic.LoadInt32((*int32)(&c.state)))}func (c *Connection) Ping(current uint32, cmd Command) {	seg := NewCmdOnlySegment()	seg.Conv = c.meta.Conversation	seg.Cmd = cmd	seg.ReceivingNext = c.receivingWorker.NextNumber()	seg.SendingNext = c.sendingWorker.FirstUnacknowledged()	seg.PeerRTO = c.roundTrip.Timeout()	if c.State() == StateReadyToClose {		seg.Option = SegmentOptionClose	}	c.output.Write(seg)	atomic.StoreUint32(&c.lastPingTime, current)	seg.Release()}
 |