| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- package kcp
- import (
- "sync"
- "v2ray.com/core/common/buf"
- )
- type ReceivingWindow struct {
- start uint32
- size uint32
- list []*DataSegment
- }
- func NewReceivingWindow(size uint32) *ReceivingWindow {
- return &ReceivingWindow{
- start: 0,
- size: size,
- list: make([]*DataSegment, size),
- }
- }
- func (v *ReceivingWindow) Size() uint32 {
- return v.size
- }
- func (v *ReceivingWindow) Position(idx uint32) uint32 {
- return (idx + v.start) % v.size
- }
- func (v *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
- pos := v.Position(idx)
- if v.list[pos] != nil {
- return false
- }
- v.list[pos] = value
- return true
- }
- func (v *ReceivingWindow) Remove(idx uint32) *DataSegment {
- pos := v.Position(idx)
- e := v.list[pos]
- v.list[pos] = nil
- return e
- }
- func (v *ReceivingWindow) RemoveFirst() *DataSegment {
- return v.Remove(0)
- }
- func (w *ReceivingWindow) HasFirst() bool {
- return w.list[w.Position(0)] != nil
- }
- func (v *ReceivingWindow) Advance() {
- v.start++
- if v.start == v.size {
- v.start = 0
- }
- }
- type AckList struct {
- writer SegmentWriter
- timestamps []uint32
- numbers []uint32
- nextFlush []uint32
- flushCandidates []uint32
- dirty bool
- }
- func NewAckList(writer SegmentWriter) *AckList {
- return &AckList{
- writer: writer,
- timestamps: make([]uint32, 0, 128),
- numbers: make([]uint32, 0, 128),
- nextFlush: make([]uint32, 0, 128),
- flushCandidates: make([]uint32, 0, 128),
- }
- }
- func (v *AckList) Add(number uint32, timestamp uint32) {
- v.timestamps = append(v.timestamps, timestamp)
- v.numbers = append(v.numbers, number)
- v.nextFlush = append(v.nextFlush, 0)
- v.dirty = true
- }
- func (v *AckList) Clear(una uint32) {
- count := 0
- for i := 0; i < len(v.numbers); i++ {
- if v.numbers[i] < una {
- continue
- }
- if i != count {
- v.numbers[count] = v.numbers[i]
- v.timestamps[count] = v.timestamps[i]
- v.nextFlush[count] = v.nextFlush[i]
- }
- count++
- }
- if count < len(v.numbers) {
- v.numbers = v.numbers[:count]
- v.timestamps = v.timestamps[:count]
- v.nextFlush = v.nextFlush[:count]
- v.dirty = true
- }
- }
- func (v *AckList) Flush(current uint32, rto uint32) {
- v.flushCandidates = v.flushCandidates[:0]
- seg := NewAckSegment()
- for i := 0; i < len(v.numbers); i++ {
- if v.nextFlush[i] > current {
- if len(v.flushCandidates) < cap(v.flushCandidates) {
- v.flushCandidates = append(v.flushCandidates, v.numbers[i])
- }
- continue
- }
- seg.PutNumber(v.numbers[i])
- seg.PutTimestamp(v.timestamps[i])
- timeout := rto / 2
- if timeout < 20 {
- timeout = 20
- }
- v.nextFlush[i] = current + timeout
- if seg.IsFull() {
- v.writer.Write(seg)
- seg.Release()
- seg = NewAckSegment()
- v.dirty = false
- }
- }
- if v.dirty || !seg.IsEmpty() {
- for _, number := range v.flushCandidates {
- if seg.IsFull() {
- break
- }
- seg.PutNumber(number)
- }
- v.writer.Write(seg)
- seg.Release()
- v.dirty = false
- }
- }
- type ReceivingWorker struct {
- sync.RWMutex
- conn *Connection
- leftOver *buf.Buffer
- window *ReceivingWindow
- acklist *AckList
- nextNumber uint32
- windowSize uint32
- }
- func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
- worker := &ReceivingWorker{
- conn: kcp,
- window: NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),
- windowSize: kcp.Config.GetReceivingInFlightSize(),
- }
- worker.acklist = NewAckList(worker)
- return worker
- }
- func (v *ReceivingWorker) Release() {
- v.Lock()
- v.leftOver.Release()
- v.Unlock()
- }
- func (v *ReceivingWorker) ProcessSendingNext(number uint32) {
- v.Lock()
- defer v.Unlock()
- v.acklist.Clear(number)
- }
- func (v *ReceivingWorker) ProcessSegment(seg *DataSegment) {
- v.Lock()
- defer v.Unlock()
- number := seg.Number
- idx := number - v.nextNumber
- if idx >= v.windowSize {
- return
- }
- v.acklist.Clear(seg.SendingNext)
- v.acklist.Add(number, seg.Timestamp)
- if !v.window.Set(idx, seg) {
- seg.Release()
- }
- }
- func (v *ReceivingWorker) Read(b []byte) int {
- v.Lock()
- defer v.Unlock()
- total := 0
- if v.leftOver != nil {
- nBytes := copy(b, v.leftOver.Bytes())
- if nBytes < v.leftOver.Len() {
- v.leftOver.SliceFrom(nBytes)
- return nBytes
- }
- v.leftOver.Release()
- v.leftOver = nil
- total += nBytes
- }
- for total < len(b) {
- seg := v.window.RemoveFirst()
- if seg == nil {
- break
- }
- v.window.Advance()
- v.nextNumber++
- nBytes := copy(b[total:], seg.Data.Bytes())
- total += nBytes
- if nBytes < seg.Data.Len() {
- seg.Data.SliceFrom(nBytes)
- v.leftOver = seg.Data
- seg.Data = nil
- seg.Release()
- break
- }
- seg.Release()
- }
- return total
- }
- func (w *ReceivingWorker) IsDataAvailable() bool {
- w.RLock()
- defer w.RUnlock()
- return w.window.HasFirst()
- }
- func (w *ReceivingWorker) NextNumber() uint32 {
- w.RLock()
- defer w.RUnlock()
- return w.nextNumber
- }
- func (v *ReceivingWorker) Flush(current uint32) {
- v.Lock()
- defer v.Unlock()
- v.acklist.Flush(current, v.conn.roundTrip.Timeout())
- }
- func (v *ReceivingWorker) Write(seg Segment) error {
- ackSeg := seg.(*AckSegment)
- ackSeg.Conv = v.conn.conv
- ackSeg.ReceivingNext = v.nextNumber
- ackSeg.ReceivingWindow = v.nextNumber + v.windowSize
- if v.conn.state == StateReadyToClose {
- ackSeg.Option = SegmentOptionClose
- }
- return v.conn.output.Write(ackSeg)
- }
- func (v *ReceivingWorker) CloseRead() {
- }
- func (v *ReceivingWorker) UpdateNecessary() bool {
- v.RLock()
- defer v.RUnlock()
- return len(v.acklist.numbers) > 0
- }
|