| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 | package kcpimport (	"sync"	"v2ray.com/core/common/alloc")type ReceivingWindow struct {	start uint32	size  uint32	list  []*DataSegment}func NewReceivingWindow(size uint32) *ReceivingWindow {	return &ReceivingWindow{		start: 0,		size:  size,		list:  make([]*DataSegment, size),	}}func (v *ReceivingWindow) Size() uint32 {	return v.size}func (v *ReceivingWindow) Position(idx uint32) uint32 {	return (idx + v.start) % v.size}func (v *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {	pos := v.Position(idx)	if v.list[pos] != nil {		return false	}	v.list[pos] = value	return true}func (v *ReceivingWindow) Remove(idx uint32) *DataSegment {	pos := v.Position(idx)	e := v.list[pos]	v.list[pos] = nil	return e}func (v *ReceivingWindow) RemoveFirst() *DataSegment {	return v.Remove(0)}func (v *ReceivingWindow) Advance() {	v.start++	if v.start == v.size {		v.start = 0	}}type AckList struct {	writer     SegmentWriter	timestamps []uint32	numbers    []uint32	nextFlush  []uint32	flushCandidates []uint32	dirty           bool}func NewAckList(writer SegmentWriter) *AckList {	return &AckList{		writer:          writer,		timestamps:      make([]uint32, 0, 128),		numbers:         make([]uint32, 0, 128),		nextFlush:       make([]uint32, 0, 128),		flushCandidates: make([]uint32, 0, 128),	}}func (v *AckList) Add(number uint32, timestamp uint32) {	v.timestamps = append(v.timestamps, timestamp)	v.numbers = append(v.numbers, number)	v.nextFlush = append(v.nextFlush, 0)	v.dirty = true}func (v *AckList) Clear(una uint32) {	count := 0	for i := 0; i < len(v.numbers); i++ {		if v.numbers[i] < una {			continue		}		if i != count {			v.numbers[count] = v.numbers[i]			v.timestamps[count] = v.timestamps[i]			v.nextFlush[count] = v.nextFlush[i]		}		count++	}	if count < len(v.numbers) {		v.numbers = v.numbers[:count]		v.timestamps = v.timestamps[:count]		v.nextFlush = v.nextFlush[:count]		v.dirty = true	}}func (v *AckList) Flush(current uint32, rto uint32) {	v.flushCandidates = v.flushCandidates[:0]	seg := NewAckSegment()	for i := 0; i < len(v.numbers); i++ {		if v.nextFlush[i] > current {			if len(v.flushCandidates) < cap(v.flushCandidates) {				v.flushCandidates = append(v.flushCandidates, v.numbers[i])			}			continue		}		seg.PutNumber(v.numbers[i])		seg.PutTimestamp(v.timestamps[i])		timeout := rto / 2		if timeout < 20 {			timeout = 20		}		v.nextFlush[i] = current + timeout		if seg.IsFull() {			v.writer.Write(seg)			seg.Release()			seg = NewAckSegment()			v.dirty = false		}	}	if v.dirty || seg.Count > 0 {		for _, number := range v.flushCandidates {			if seg.IsFull() {				break			}			seg.PutNumber(number)		}		v.writer.Write(seg)		seg.Release()		v.dirty = false	}}type ReceivingWorker struct {	sync.RWMutex	conn       *Connection	leftOver   *alloc.Buffer	window     *ReceivingWindow	acklist    *AckList	nextNumber uint32	windowSize uint32}func NewReceivingWorker(kcp *Connection) *ReceivingWorker {	worker := &ReceivingWorker{		conn:       kcp,		window:     NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),		windowSize: kcp.Config.GetReceivingInFlightSize(),	}	worker.acklist = NewAckList(worker)	return worker}func (v *ReceivingWorker) Release() {	v.leftOver.Release()}func (v *ReceivingWorker) ProcessSendingNext(number uint32) {	v.Lock()	defer v.Unlock()	v.acklist.Clear(number)}func (v *ReceivingWorker) ProcessSegment(seg *DataSegment) {	v.Lock()	defer v.Unlock()	number := seg.Number	idx := number - v.nextNumber	if idx >= v.windowSize {		return	}	v.acklist.Clear(seg.SendingNext)	v.acklist.Add(number, seg.Timestamp)	if !v.window.Set(idx, seg) {		seg.Release()	}}func (v *ReceivingWorker) Read(b []byte) int {	v.Lock()	defer v.Unlock()	total := 0	if v.leftOver != nil {		nBytes := copy(b, v.leftOver.Value)		if nBytes < v.leftOver.Len() {			v.leftOver.SliceFrom(nBytes)			return nBytes		}		v.leftOver.Release()		v.leftOver = nil		total += nBytes	}	for total < len(b) {		seg := v.window.RemoveFirst()		if seg == nil {			break		}		v.window.Advance()		v.nextNumber++		nBytes := copy(b[total:], seg.Data.Value)		total += nBytes		if nBytes < seg.Data.Len() {			seg.Data.SliceFrom(nBytes)			v.leftOver = seg.Data			seg.Data = nil			seg.Release()			break		}		seg.Release()	}	return total}func (v *ReceivingWorker) Flush(current uint32) {	v.Lock()	defer v.Unlock()	v.acklist.Flush(current, v.conn.roundTrip.Timeout())}func (v *ReceivingWorker) Write(seg Segment) {	ackSeg := seg.(*AckSegment)	ackSeg.Conv = v.conn.conv	ackSeg.ReceivingNext = v.nextNumber	ackSeg.ReceivingWindow = v.nextNumber + v.windowSize	if v.conn.state == StateReadyToClose {		ackSeg.Option = SegmentOptionClose	}	v.conn.output.Write(ackSeg)}func (v *ReceivingWorker) CloseRead() {}func (v *ReceivingWorker) UpdateNecessary() bool {	return len(v.acklist.numbers) > 0}
 |