| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 | 
							- package kcp
 
- import (
 
- 	"sync"
 
- 	"v2ray.com/core/common"
 
- 	"v2ray.com/core/common/buf"
 
- )
 
- type SendingWindow struct {
 
- 	start uint32
 
- 	cap   uint32
 
- 	len   uint32
 
- 	last  uint32
 
- 	data  []DataSegment
 
- 	inuse []bool
 
- 	prev  []uint32
 
- 	next  []uint32
 
- 	totalInFlightSize uint32
 
- 	writer            SegmentWriter
 
- 	onPacketLoss      func(uint32)
 
- }
 
- func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow {
 
- 	window := &SendingWindow{
 
- 		start:        0,
 
- 		cap:          size,
 
- 		len:          0,
 
- 		last:         0,
 
- 		data:         make([]DataSegment, size),
 
- 		prev:         make([]uint32, size),
 
- 		next:         make([]uint32, size),
 
- 		inuse:        make([]bool, size),
 
- 		writer:       writer,
 
- 		onPacketLoss: onPacketLoss,
 
- 	}
 
- 	return window
 
- }
 
- func (sw *SendingWindow) Release() {
 
- 	if sw == nil {
 
- 		return
 
- 	}
 
- 	sw.len = 0
 
- 	for _, seg := range sw.data {
 
- 		seg.Release()
 
- 	}
 
- }
 
- func (sw *SendingWindow) Len() int {
 
- 	return int(sw.len)
 
- }
 
- func (sw *SendingWindow) IsEmpty() bool {
 
- 	return sw.len == 0
 
- }
 
- func (sw *SendingWindow) Size() uint32 {
 
- 	return sw.cap
 
- }
 
- func (sw *SendingWindow) IsFull() bool {
 
- 	return sw.len == sw.cap
 
- }
 
- func (sw *SendingWindow) Push(number uint32) *buf.Buffer {
 
- 	pos := (sw.start + sw.len) % sw.cap
 
- 	sw.data[pos].Number = number
 
- 	sw.data[pos].timeout = 0
 
- 	sw.data[pos].transmit = 0
 
- 	sw.inuse[pos] = true
 
- 	if sw.len > 0 {
 
- 		sw.next[sw.last] = pos
 
- 		sw.prev[pos] = sw.last
 
- 	}
 
- 	sw.last = pos
 
- 	sw.len++
 
- 	return sw.data[pos].Data()
 
- }
 
- func (sw *SendingWindow) FirstNumber() uint32 {
 
- 	return sw.data[sw.start].Number
 
- }
 
- func (sw *SendingWindow) Clear(una uint32) {
 
- 	for !sw.IsEmpty() && sw.data[sw.start].Number < una {
 
- 		sw.Remove(0)
 
- 	}
 
- }
 
- func (sw *SendingWindow) Remove(idx uint32) bool {
 
- 	if sw.IsEmpty() {
 
- 		return false
 
- 	}
 
- 	pos := (sw.start + idx) % sw.cap
 
- 	if !sw.inuse[pos] {
 
- 		return false
 
- 	}
 
- 	sw.inuse[pos] = false
 
- 	sw.totalInFlightSize--
 
- 	if pos == sw.start && pos == sw.last {
 
- 		sw.len = 0
 
- 		sw.start = 0
 
- 		sw.last = 0
 
- 	} else if pos == sw.start {
 
- 		delta := sw.next[pos] - sw.start
 
- 		if sw.next[pos] < sw.start {
 
- 			delta = sw.next[pos] + sw.cap - sw.start
 
- 		}
 
- 		sw.start = sw.next[pos]
 
- 		sw.len -= delta
 
- 	} else if pos == sw.last {
 
- 		sw.last = sw.prev[pos]
 
- 	} else {
 
- 		sw.next[sw.prev[pos]] = sw.next[pos]
 
- 		sw.prev[sw.next[pos]] = sw.prev[pos]
 
- 	}
 
- 	return true
 
- }
 
- func (sw *SendingWindow) HandleFastAck(number uint32, rto uint32) {
 
- 	if sw.IsEmpty() {
 
- 		return
 
- 	}
 
- 	sw.Visit(func(seg *DataSegment) bool {
 
- 		if number == seg.Number || number-seg.Number > 0x7FFFFFFF {
 
- 			return false
 
- 		}
 
- 		if seg.transmit > 0 && seg.timeout > rto/3 {
 
- 			seg.timeout -= rto / 3
 
- 		}
 
- 		return true
 
- 	})
 
- }
 
- func (sw *SendingWindow) Visit(visitor func(seg *DataSegment) bool) {
 
- 	if sw.IsEmpty() {
 
- 		return
 
- 	}
 
- 	for i := sw.start; ; i = sw.next[i] {
 
- 		if !visitor(&sw.data[i]) || i == sw.last {
 
- 			break
 
- 		}
 
- 	}
 
- }
 
- func (sw *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) {
 
- 	if sw.IsEmpty() {
 
- 		return
 
- 	}
 
- 	var lost uint32
 
- 	var inFlightSize uint32
 
- 	sw.Visit(func(segment *DataSegment) bool {
 
- 		if current-segment.timeout >= 0x7FFFFFFF {
 
- 			return true
 
- 		}
 
- 		if segment.transmit == 0 {
 
- 			// First time
 
- 			sw.totalInFlightSize++
 
- 		} else {
 
- 			lost++
 
- 		}
 
- 		segment.timeout = current + rto
 
- 		segment.Timestamp = current
 
- 		segment.transmit++
 
- 		sw.writer.Write(segment)
 
- 		inFlightSize++
 
- 		if inFlightSize >= maxInFlightSize {
 
- 			return false
 
- 		}
 
- 		return true
 
- 	})
 
- 	if sw.onPacketLoss != nil && inFlightSize > 0 && sw.totalInFlightSize != 0 {
 
- 		rate := lost * 100 / sw.totalInFlightSize
 
- 		sw.onPacketLoss(rate)
 
- 	}
 
- }
 
- type SendingWorker struct {
 
- 	sync.RWMutex
 
- 	conn                       *Connection
 
- 	window                     *SendingWindow
 
- 	firstUnacknowledged        uint32
 
- 	firstUnacknowledgedUpdated bool
 
- 	nextNumber                 uint32
 
- 	remoteNextNumber           uint32
 
- 	controlWindow              uint32
 
- 	fastResend                 uint32
 
- }
 
- func NewSendingWorker(kcp *Connection) *SendingWorker {
 
- 	worker := &SendingWorker{
 
- 		conn:             kcp,
 
- 		fastResend:       2,
 
- 		remoteNextNumber: 32,
 
- 		controlWindow:    kcp.Config.GetSendingInFlightSize(),
 
- 	}
 
- 	worker.window = NewSendingWindow(kcp.Config.GetSendingBufferSize(), worker, worker.OnPacketLoss)
 
- 	return worker
 
- }
 
- func (w *SendingWorker) Release() {
 
- 	w.Lock()
 
- 	w.window.Release()
 
- 	w.Unlock()
 
- }
 
- func (w *SendingWorker) ProcessReceivingNext(nextNumber uint32) {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	w.ProcessReceivingNextWithoutLock(nextNumber)
 
- }
 
- func (w *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) {
 
- 	w.window.Clear(nextNumber)
 
- 	w.FindFirstUnacknowledged()
 
- }
 
- func (w *SendingWorker) FindFirstUnacknowledged() {
 
- 	first := w.firstUnacknowledged
 
- 	if !w.window.IsEmpty() {
 
- 		w.firstUnacknowledged = w.window.FirstNumber()
 
- 	} else {
 
- 		w.firstUnacknowledged = w.nextNumber
 
- 	}
 
- 	if first != w.firstUnacknowledged {
 
- 		w.firstUnacknowledgedUpdated = true
 
- 	}
 
- }
 
- func (w *SendingWorker) processAck(number uint32) bool {
 
- 	// number < v.firstUnacknowledged || number >= v.nextNumber
 
- 	if number-w.firstUnacknowledged > 0x7FFFFFFF || number-w.nextNumber < 0x7FFFFFFF {
 
- 		return false
 
- 	}
 
- 	removed := w.window.Remove(number - w.firstUnacknowledged)
 
- 	if removed {
 
- 		w.FindFirstUnacknowledged()
 
- 	}
 
- 	return removed
 
- }
 
- func (w *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) {
 
- 	defer seg.Release()
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	if w.remoteNextNumber < seg.ReceivingWindow {
 
- 		w.remoteNextNumber = seg.ReceivingWindow
 
- 	}
 
- 	w.ProcessReceivingNextWithoutLock(seg.ReceivingNext)
 
- 	if seg.IsEmpty() {
 
- 		return
 
- 	}
 
- 	var maxack uint32
 
- 	var maxackRemoved bool
 
- 	for _, number := range seg.NumberList {
 
- 		removed := w.processAck(number)
 
- 		if maxack < number {
 
- 			maxack = number
 
- 			maxackRemoved = removed
 
- 		}
 
- 	}
 
- 	if maxackRemoved {
 
- 		w.window.HandleFastAck(maxack, rto)
 
- 		if current-seg.Timestamp < 10000 {
 
- 			w.conn.roundTrip.Update(current-seg.Timestamp, current)
 
- 		}
 
- 	}
 
- }
 
- func (w *SendingWorker) Push(f buf.Supplier) bool {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	if w.window.IsFull() {
 
- 		return false
 
- 	}
 
- 	b := w.window.Push(w.nextNumber)
 
- 	w.nextNumber++
 
- 	common.Must(b.Reset(f))
 
- 	return true
 
- }
 
- func (w *SendingWorker) Write(seg Segment) error {
 
- 	dataSeg := seg.(*DataSegment)
 
- 	dataSeg.Conv = w.conn.meta.Conversation
 
- 	dataSeg.SendingNext = w.firstUnacknowledged
 
- 	dataSeg.Option = 0
 
- 	if w.conn.State() == StateReadyToClose {
 
- 		dataSeg.Option = SegmentOptionClose
 
- 	}
 
- 	return w.conn.output.Write(dataSeg)
 
- }
 
- func (w *SendingWorker) OnPacketLoss(lossRate uint32) {
 
- 	if !w.conn.Config.Congestion || w.conn.roundTrip.Timeout() == 0 {
 
- 		return
 
- 	}
 
- 	if lossRate >= 15 {
 
- 		w.controlWindow = 3 * w.controlWindow / 4
 
- 	} else if lossRate <= 5 {
 
- 		w.controlWindow += w.controlWindow / 4
 
- 	}
 
- 	if w.controlWindow < 16 {
 
- 		w.controlWindow = 16
 
- 	}
 
- 	if w.controlWindow > 2*w.conn.Config.GetSendingInFlightSize() {
 
- 		w.controlWindow = 2 * w.conn.Config.GetSendingInFlightSize()
 
- 	}
 
- }
 
- func (w *SendingWorker) Flush(current uint32) {
 
- 	w.Lock()
 
- 	cwnd := w.firstUnacknowledged + w.conn.Config.GetSendingInFlightSize()
 
- 	if cwnd > w.remoteNextNumber {
 
- 		cwnd = w.remoteNextNumber
 
- 	}
 
- 	if w.conn.Config.Congestion && cwnd > w.firstUnacknowledged+w.controlWindow {
 
- 		cwnd = w.firstUnacknowledged + w.controlWindow
 
- 	}
 
- 	if !w.window.IsEmpty() {
 
- 		w.window.Flush(current, w.conn.roundTrip.Timeout(), cwnd)
 
- 		w.firstUnacknowledgedUpdated = false
 
- 	}
 
- 	updated := w.firstUnacknowledgedUpdated
 
- 	w.firstUnacknowledgedUpdated = false
 
- 	w.Unlock()
 
- 	if updated {
 
- 		w.conn.Ping(current, CommandPing)
 
- 	}
 
- }
 
- func (w *SendingWorker) CloseWrite() {
 
- 	w.Lock()
 
- 	defer w.Unlock()
 
- 	w.window.Clear(0xFFFFFFFF)
 
- }
 
- func (w *SendingWorker) IsEmpty() bool {
 
- 	w.RLock()
 
- 	defer w.RUnlock()
 
- 	return w.window.IsEmpty()
 
- }
 
- func (w *SendingWorker) UpdateNecessary() bool {
 
- 	return !w.IsEmpty()
 
- }
 
- func (w *SendingWorker) FirstUnacknowledged() uint32 {
 
- 	w.RLock()
 
- 	defer w.RUnlock()
 
- 	return w.firstUnacknowledged
 
- }
 
 
  |