| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 | 
							- package kcp
 
- import (
 
- 	"sync"
 
- 	"v2ray.com/core/common/buf"
 
- )
 
- type ReceivingWindow struct {
 
- 	start uint32
 
- 	size  uint32
 
- 	list  []*DataSegment
 
- }
 
- func NewReceivingWindow(size uint32) *ReceivingWindow {
 
- 	return &ReceivingWindow{
 
- 		start: 0,
 
- 		size:  size,
 
- 		list:  make([]*DataSegment, size),
 
- 	}
 
- }
 
- func (w *ReceivingWindow) Size() uint32 {
 
- 	return w.size
 
- }
 
- func (w *ReceivingWindow) Position(idx uint32) uint32 {
 
- 	return (idx + w.start) % w.size
 
- }
 
- func (w *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
 
- 	pos := w.Position(idx)
 
- 	if w.list[pos] != nil {
 
- 		return false
 
- 	}
 
- 	w.list[pos] = value
 
- 	return true
 
- }
 
- func (w *ReceivingWindow) Remove(idx uint32) *DataSegment {
 
- 	pos := w.Position(idx)
 
- 	e := w.list[pos]
 
- 	w.list[pos] = nil
 
- 	return e
 
- }
 
- func (w *ReceivingWindow) RemoveFirst() *DataSegment {
 
- 	return w.Remove(0)
 
- }
 
- func (w *ReceivingWindow) HasFirst() bool {
 
- 	return w.list[w.Position(0)] != nil
 
- }
 
- func (w *ReceivingWindow) Advance() {
 
- 	w.start++
 
- 	if w.start == w.size {
 
- 		w.start = 0
 
- 	}
 
- }
 
- type AckList struct {
 
- 	writer     SegmentWriter
 
- 	timestamps []uint32
 
- 	numbers    []uint32
 
- 	nextFlush  []uint32
 
- 	flushCandidates []uint32
 
- 	dirty           bool
 
- }
 
- func NewAckList(writer SegmentWriter) *AckList {
 
- 	return &AckList{
 
- 		writer:          writer,
 
- 		timestamps:      make([]uint32, 0, 128),
 
- 		numbers:         make([]uint32, 0, 128),
 
- 		nextFlush:       make([]uint32, 0, 128),
 
- 		flushCandidates: make([]uint32, 0, 128),
 
- 	}
 
- }
 
- func (l *AckList) Add(number uint32, timestamp uint32) {
 
- 	l.timestamps = append(l.timestamps, timestamp)
 
- 	l.numbers = append(l.numbers, number)
 
- 	l.nextFlush = append(l.nextFlush, 0)
 
- 	l.dirty = true
 
- }
 
- func (l *AckList) Clear(una uint32) {
 
- 	count := 0
 
- 	for i := 0; i < len(l.numbers); i++ {
 
- 		if l.numbers[i] < una {
 
- 			continue
 
- 		}
 
- 		if i != count {
 
- 			l.numbers[count] = l.numbers[i]
 
- 			l.timestamps[count] = l.timestamps[i]
 
- 			l.nextFlush[count] = l.nextFlush[i]
 
- 		}
 
- 		count++
 
- 	}
 
- 	if count < len(l.numbers) {
 
- 		l.numbers = l.numbers[:count]
 
- 		l.timestamps = l.timestamps[:count]
 
- 		l.nextFlush = l.nextFlush[:count]
 
- 		l.dirty = true
 
- 	}
 
- }
 
- func (l *AckList) Flush(current uint32, rto uint32) {
 
- 	l.flushCandidates = l.flushCandidates[:0]
 
- 	seg := NewAckSegment()
 
- 	for i := 0; i < len(l.numbers); i++ {
 
- 		if l.nextFlush[i] > current {
 
- 			if len(l.flushCandidates) < cap(l.flushCandidates) {
 
- 				l.flushCandidates = append(l.flushCandidates, l.numbers[i])
 
- 			}
 
- 			continue
 
- 		}
 
- 		seg.PutNumber(l.numbers[i])
 
- 		seg.PutTimestamp(l.timestamps[i])
 
- 		timeout := rto / 2
 
- 		if timeout < 20 {
 
- 			timeout = 20
 
- 		}
 
- 		l.nextFlush[i] = current + timeout
 
- 		if seg.IsFull() {
 
- 			l.writer.Write(seg)
 
- 			seg.Release()
 
- 			seg = NewAckSegment()
 
- 			l.dirty = false
 
- 		}
 
- 	}
 
- 	if l.dirty || !seg.IsEmpty() {
 
- 		for _, number := range l.flushCandidates {
 
- 			if seg.IsFull() {
 
- 				break
 
- 			}
 
- 			seg.PutNumber(number)
 
- 		}
 
- 		l.writer.Write(seg)
 
- 		seg.Release()
 
- 		l.dirty = false
 
- 	}
 
- }
 
- type ReceivingWorker struct {
 
- 	sync.RWMutex
 
- 	conn       *Connection
 
- 	leftOver   buf.MultiBuffer
 
- 	window     *ReceivingWindow
 
- 	acklist    *AckList
 
- 	nextNumber uint32
 
- 	windowSize uint32
 
- }
 
- func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
 
- 	worker := &ReceivingWorker{
 
- 		conn:       kcp,
 
- 		window:     NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),
 
- 		windowSize: kcp.Config.GetReceivingInFlightSize(),
 
- 	}
 
- 	worker.acklist = NewAckList(worker)
 
- 	return worker
 
- }
 
- func (w *ReceivingWorker) Release() {
 
- 	w.Lock()
 
- 	w.leftOver.Release()
 
- 	w.Unlock()
 
- }
 
- func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	w.acklist.Clear(number)
 
- }
 
- func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	number := seg.Number
 
- 	idx := number - w.nextNumber
 
- 	if idx >= w.windowSize {
 
- 		return
 
- 	}
 
- 	w.acklist.Clear(seg.SendingNext)
 
- 	w.acklist.Add(number, seg.Timestamp)
 
- 	if !w.window.Set(idx, seg) {
 
- 		seg.Release()
 
- 	}
 
- }
 
- func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
 
- 	if w.leftOver != nil {
 
- 		mb := w.leftOver
 
- 		w.leftOver = nil
 
- 		return mb
 
- 	}
 
- 	mb := buf.NewMultiBufferCap(32)
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	for {
 
- 		seg := w.window.RemoveFirst()
 
- 		if seg == nil {
 
- 			break
 
- 		}
 
- 		w.window.Advance()
 
- 		w.nextNumber++
 
- 		mb.Append(seg.Detach())
 
- 		seg.Release()
 
- 	}
 
- 	return mb
 
- }
 
- func (w *ReceivingWorker) Read(b []byte) int {
 
- 	mb := w.ReadMultiBuffer()
 
- 	nBytes, _ := mb.Read(b)
 
- 	if !mb.IsEmpty() {
 
- 		w.leftOver = mb
 
- 	}
 
- 	return nBytes
 
- }
 
- func (w *ReceivingWorker) IsDataAvailable() bool {
 
- 	w.RLock()
 
- 	defer w.RUnlock()
 
- 	return w.window.HasFirst()
 
- }
 
- func (w *ReceivingWorker) NextNumber() uint32 {
 
- 	w.RLock()
 
- 	defer w.RUnlock()
 
- 	return w.nextNumber
 
- }
 
- func (w *ReceivingWorker) Flush(current uint32) {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	w.acklist.Flush(current, w.conn.roundTrip.Timeout())
 
- }
 
- func (w *ReceivingWorker) Write(seg Segment) error {
 
- 	ackSeg := seg.(*AckSegment)
 
- 	ackSeg.Conv = w.conn.meta.Conversation
 
- 	ackSeg.ReceivingNext = w.nextNumber
 
- 	ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
 
- 	if w.conn.State() == StateReadyToClose {
 
- 		ackSeg.Option = SegmentOptionClose
 
- 	}
 
- 	return w.conn.output.Write(ackSeg)
 
- }
 
- func (*ReceivingWorker) CloseRead() {
 
- }
 
- func (w *ReceivingWorker) UpdateNecessary() bool {
 
- 	w.RLock()
 
- 	defer w.RUnlock()
 
- 	return len(w.acklist.numbers) > 0
 
- }
 
 
  |