| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 | package kcpimport (	"io"	"sync"	"time"	"github.com/v2ray/v2ray-core/common/alloc")type ReceivingWindow struct {	start uint32	size  uint32	list  []*DataSegment}func NewReceivingWindow(size uint32) *ReceivingWindow {	return &ReceivingWindow{		start: 0,		size:  size,		list:  make([]*DataSegment, size),	}}func (this *ReceivingWindow) Size() uint32 {	return this.size}func (this *ReceivingWindow) Position(idx uint32) uint32 {	return (idx + this.start) % this.size}func (this *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {	pos := this.Position(idx)	if this.list[pos] != nil {		return false	}	this.list[pos] = value	return true}func (this *ReceivingWindow) Remove(idx uint32) *DataSegment {	pos := this.Position(idx)	e := this.list[pos]	this.list[pos] = nil	return e}func (this *ReceivingWindow) RemoveFirst() *DataSegment {	return this.Remove(0)}func (this *ReceivingWindow) Advance() {	this.start++	if this.start == this.size {		this.start = 0	}}type ReceivingQueue struct {	sync.Mutex	closed  bool	cache   *alloc.Buffer	queue   chan *alloc.Buffer	timeout time.Time}func NewReceivingQueue(size uint32) *ReceivingQueue {	return &ReceivingQueue{		queue: make(chan *alloc.Buffer, size),	}}func (this *ReceivingQueue) Read(buf []byte) (int, error) {	if this.closed {		return 0, io.EOF	}	if this.cache.Len() > 0 {		nBytes, err := this.cache.Read(buf)		if this.cache.IsEmpty() {			this.cache.Release()			this.cache = nil		}		return nBytes, err	}	var totalBytes intL:	for totalBytes < len(buf) {		timeToSleep := time.Millisecond		select {		case payload, open := <-this.queue:			if !open {				return totalBytes, io.EOF			}			nBytes, err := payload.Read(buf)			totalBytes += nBytes			if err != nil {				return totalBytes, err			}			if !payload.IsEmpty() {				this.cache = payload			}			buf = buf[nBytes:]		case <-time.After(timeToSleep):			if totalBytes > 0 {				break L			}			if !this.timeout.IsZero() && this.timeout.Before(time.Now()) {				return totalBytes, errTimeout			}			timeToSleep += 500 * time.Millisecond		}	}	return totalBytes, nil}func (this *ReceivingQueue) Put(payload *alloc.Buffer) bool {	this.Lock()	defer this.Unlock()	if this.closed {		payload.Release()		return false	}	select {	case this.queue <- payload:		return true	default:		return false	}}func (this *ReceivingQueue) SetReadDeadline(t time.Time) error {	this.timeout = t	return nil}func (this *ReceivingQueue) Close() {	this.Lock()	defer this.Unlock()	if this.closed {		return	}	this.closed = true	close(this.queue)}type AckList struct {	sync.Mutex	writer     SegmentWriter	timestamps []uint32	numbers    []uint32	nextFlush  []uint32}func NewAckList(writer SegmentWriter) *AckList {	return &AckList{		writer:     writer,		timestamps: make([]uint32, 0, 32),		numbers:    make([]uint32, 0, 32),		nextFlush:  make([]uint32, 0, 32),	}}func (this *AckList) Add(number uint32, timestamp uint32) {	this.Lock()	defer this.Unlock()	this.timestamps = append(this.timestamps, timestamp)	this.numbers = append(this.numbers, number)	this.nextFlush = append(this.nextFlush, 0)}func (this *AckList) Clear(una uint32) {	this.Lock()	defer this.Unlock()	count := 0	for i := 0; i < len(this.numbers); i++ {		if this.numbers[i] >= una {			if i != count {				this.numbers[count] = this.numbers[i]				this.timestamps[count] = this.timestamps[i]				this.nextFlush[count] = this.nextFlush[i]			}			count++		}	}	if count < len(this.numbers) {		this.numbers = this.numbers[:count]		this.timestamps = this.timestamps[:count]		this.nextFlush = this.nextFlush[:count]	}}func (this *AckList) Flush(current uint32, rto uint32) {	seg := new(AckSegment)	this.Lock()	for i := 0; i < len(this.numbers); i++ {		if this.nextFlush[i] <= current {			seg.Count++			seg.NumberList = append(seg.NumberList, this.numbers[i])			seg.TimestampList = append(seg.TimestampList, this.timestamps[i])			this.nextFlush[i] = current + rto/2			if seg.Count == 128 {				break			}		}	}	this.Unlock()	if seg.Count > 0 {		this.writer.Write(seg)	}}type ReceivingWorker struct {	kcp         *KCP	queue       *ReceivingQueue	window      *ReceivingWindow	windowMutex sync.Mutex	acklist     *AckList	updated     bool	nextNumber  uint32	windowSize  uint32}func NewReceivingWorker(kcp *KCP) *ReceivingWorker {	windowSize := effectiveConfig.GetReceivingWindowSize()	worker := &ReceivingWorker{		kcp:        kcp,		queue:      NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),		window:     NewReceivingWindow(windowSize),		windowSize: windowSize,	}	worker.acklist = NewAckList(worker)	return worker}func (this *ReceivingWorker) ProcessSendingNext(number uint32) {	this.acklist.Clear(number)}func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {	number := seg.Number	if _itimediff(number, this.nextNumber+this.windowSize) >= 0 || _itimediff(number, this.nextNumber) < 0 {		return	}	this.ProcessSendingNext(seg.SendingNext)	this.acklist.Add(number, seg.Timestamp)	this.windowMutex.Lock()	idx := number - this.nextNumber	if !this.window.Set(idx, seg) {		seg.Release()	}	this.windowMutex.Unlock()	this.DumpWindow()}// @Privatefunc (this *ReceivingWorker) DumpWindow() {	this.windowMutex.Lock()	defer this.windowMutex.Unlock()	for {		seg := this.window.RemoveFirst()		if seg == nil {			break		}		if !this.queue.Put(seg.Data) {			this.window.Set(0, seg)			break		}		seg.Data = nil		this.window.Advance()		this.nextNumber++		this.updated = true	}}func (this *ReceivingWorker) Read(b []byte) (int, error) {	return this.queue.Read(b)}func (this *ReceivingWorker) SetReadDeadline(t time.Time) {	this.queue.SetReadDeadline(t)}func (this *ReceivingWorker) Flush() {	this.acklist.Flush(this.kcp.current, this.kcp.rx_rto)}func (this *ReceivingWorker) Write(seg ISegment) {	ackSeg := seg.(*AckSegment)	ackSeg.Conv = this.kcp.conv	ackSeg.ReceivingNext = this.nextNumber	ackSeg.ReceivingWindow = this.nextNumber + this.windowSize	if this.kcp.state == StateReadyToClose {		ackSeg.Opt = SegmentOptionClose	}	this.kcp.output.Write(ackSeg)	this.updated = false}func (this *ReceivingWorker) CloseRead() {	this.queue.Close()}func (this *ReceivingWorker) PingNecessary() bool {	return this.updated}
 |