Ver código fonte

cleanup kcp connection

Darien Raymond 7 anos atrás
pai
commit
2acef1cc07

+ 10 - 11
transport/internet/kcp/connection.go

@@ -166,13 +166,14 @@ func (u *Updater) SetInterval(d time.Duration) {
 }
 
 type ConnMetadata struct {
-	LocalAddr  net.Addr
-	RemoteAddr net.Addr
+	LocalAddr    net.Addr
+	RemoteAddr   net.Addr
+	Conversation uint16
 }
 
 // Connection is a KCP connection over UDP.
 type Connection struct {
-	meta       *ConnMetadata
+	meta       ConnMetadata
 	closer     io.Closer
 	rd         time.Time
 	wd         time.Time // write deadline
@@ -181,7 +182,6 @@ type Connection struct {
 	dataOutput chan bool
 	Config     *Config
 
-	conv             uint16
 	state            State
 	stateBeginTime   uint32
 	lastIncomingTime uint32
@@ -200,11 +200,10 @@ type Connection struct {
 }
 
 // NewConnection create a new KCP connection between local and remote.
-func NewConnection(conv uint16, meta *ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection {
-	log.Trace(newError("creating connection ", conv))
+func NewConnection(meta ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection {
+	log.Trace(newError("creating connection ", meta.Conversation))
 
 	conn := &Connection{
-		conv:       conv,
 		meta:       meta,
 		closer:     closer,
 		since:      nowMillisec(),
@@ -414,7 +413,7 @@ func (v *Connection) SetState(state State) {
 	current := v.Elapsed()
 	atomic.StoreInt32((*int32)(&v.state), int32(state))
 	atomic.StoreUint32(&v.stateBeginTime, current)
-	log.Trace(newError("#", v.conv, " entering state ", state, " at ", current).AtDebug())
+	log.Trace(newError("#", v.meta.Conversation, " entering state ", state, " at ", current).AtDebug())
 
 	switch state {
 	case StateReadyToClose:
@@ -553,7 +552,7 @@ func (v *Connection) Input(segments []Segment) {
 	atomic.StoreUint32(&v.lastIncomingTime, current)
 
 	for _, seg := range segments {
-		if seg.Conversation() != v.conv {
+		if seg.Conversation() != v.meta.Conversation {
 			break
 		}
 
@@ -610,7 +609,7 @@ func (v *Connection) flush() {
 	}
 
 	if v.State() == StateTerminating {
-		log.Trace(newError("#", v.conv, " sending terminating cmd.").AtDebug())
+		log.Trace(newError("#", v.meta.Conversation, " sending terminating cmd.").AtDebug())
 		v.Ping(current, CommandTerminate)
 
 		if current-atomic.LoadUint32(&v.stateBeginTime) > 8000 {
@@ -641,7 +640,7 @@ func (v *Connection) State() State {
 
 func (v *Connection) Ping(current uint32, cmd Command) {
 	seg := NewCmdOnlySegment()
-	seg.Conv = v.conv
+	seg.Conv = v.meta.Conversation
 	seg.Cmd = cmd
 	seg.ReceivinNext = v.receivingWorker.NextNumber()
 	seg.SendingNext = v.sendingWorker.FirstUnacknowledged()

+ 1 - 1
transport/internet/kcp/connection_test.go

@@ -19,7 +19,7 @@ func (NoOpCloser) Close() error {
 func TestConnectionReadTimeout(t *testing.T) {
 	assert := With(t)
 
-	conn := NewConnection(1, &ConnMetadata{}, &KCPPacketWriter{
+	conn := NewConnection(ConnMetadata{Conversation: 1}, &KCPPacketWriter{
 		Writer: buf.DiscardBytes,
 	}, NoOpCloser(0), &Config{})
 	conn.SetReadDeadline(time.Now().Add(time.Second))

+ 4 - 3
transport/internet/kcp/dialer.go

@@ -67,9 +67,10 @@ func DialKCP(ctx context.Context, dest net.Destination) (internet.Connection, er
 	}
 
 	conv := uint16(atomic.AddUint32(&globalConv, 1))
-	session := NewConnection(conv, &ConnMetadata{
-		LocalAddr:  rawConn.LocalAddr(),
-		RemoteAddr: rawConn.RemoteAddr(),
+	session := NewConnection(ConnMetadata{
+		LocalAddr:    rawConn.LocalAddr(),
+		RemoteAddr:   rawConn.RemoteAddr(),
+		Conversation: conv,
 	}, writer, rawConn, kcpSettings)
 
 	go fetchInput(ctx, rawConn, reader, session)

+ 4 - 3
transport/internet/kcp/listener.go

@@ -124,9 +124,10 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalD
 			Port: int(src.Port),
 		}
 		localAddr := v.hub.Addr()
-		conn = NewConnection(conv, &ConnMetadata{
-			LocalAddr:  localAddr,
-			RemoteAddr: remoteAddr,
+		conn = NewConnection(ConnMetadata{
+			LocalAddr:    localAddr,
+			RemoteAddr:   remoteAddr,
+			Conversation: conv,
 		}, &KCPPacketWriter{
 			Header:   v.header,
 			Security: v.security,

+ 1 - 1
transport/internet/kcp/receiving.go

@@ -252,7 +252,7 @@ func (w *ReceivingWorker) Flush(current uint32) {
 
 func (w *ReceivingWorker) Write(seg Segment) error {
 	ackSeg := seg.(*AckSegment)
-	ackSeg.Conv = w.conn.conv
+	ackSeg.Conv = w.conn.meta.Conversation
 	ackSeg.ReceivingNext = w.nextNumber
 	ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
 	if w.conn.State() == StateReadyToClose {

+ 1 - 1
transport/internet/kcp/sending.go

@@ -300,7 +300,7 @@ func (v *SendingWorker) Push() *buf.Buffer {
 func (v *SendingWorker) Write(seg Segment) error {
 	dataSeg := seg.(*DataSegment)
 
-	dataSeg.Conv = v.conn.conv
+	dataSeg.Conv = v.conn.meta.Conversation
 	dataSeg.SendingNext = v.firstUnacknowledged
 	dataSeg.Option = 0
 	if v.conn.State() == StateReadyToClose {