Selaa lähdekoodia

releasble user validator

v2ray 9 vuotta sitten
vanhempi
commit
7db14dad9b

+ 9 - 0
common/protocol/raw/server.go

@@ -30,6 +30,15 @@ func NewServerSession(validator protocol.UserValidator) *ServerSession {
 	}
 }
 
+func (this *ServerSession) Release() {
+	this.userValidator = nil
+	this.requestBodyIV = nil
+	this.requestBodyKey = nil
+	this.responseBodyIV = nil
+	this.responseBodyKey = nil
+	this.responseWriter = nil
+}
+
 func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.RequestHeader, error) {
 	buffer := alloc.NewSmallBuffer()
 	defer buffer.Release()

+ 31 - 6
common/protocol/user_validator.go

@@ -3,6 +3,9 @@ package protocol
 import (
 	"sync"
 	"time"
+
+	"github.com/v2ray/v2ray-core/common"
+	"github.com/v2ray/v2ray-core/common/signal"
 )
 
 const (
@@ -18,6 +21,8 @@ type idEntry struct {
 }
 
 type UserValidator interface {
+	common.Releasable
+
 	Add(user *User) error
 	Get(timeHash []byte) (*User, Timestamp, bool)
 }
@@ -28,6 +33,7 @@ type TimedUserValidator struct {
 	ids        []*idEntry
 	access     sync.RWMutex
 	hasher     IDHash
+	cancel     *signal.CancelSignal
 }
 
 type indexTimePair struct {
@@ -42,11 +48,23 @@ func NewTimedUserValidator(hasher IDHash) UserValidator {
 		access:     sync.RWMutex{},
 		ids:        make([]*idEntry, 0, 512),
 		hasher:     hasher,
+		cancel:     signal.NewCloseSignal(),
 	}
-	go tus.updateUserHash(time.Tick(updateIntervalSec * time.Second))
+	go tus.updateUserHash(time.Tick(updateIntervalSec*time.Second), tus.cancel)
 	return tus
 }
 
+func (this *TimedUserValidator) Release() {
+	this.cancel.Cancel()
+	this.cancel.WaitForDone()
+
+	this.validUsers = nil
+	this.userHash = nil
+	this.ids = nil
+	this.hasher = nil
+	this.cancel = nil
+}
+
 func (this *TimedUserValidator) generateNewHashes(nowSec Timestamp, idx int, entry *idEntry) {
 	var hashValue [16]byte
 	var hashValueRemoval [16]byte
@@ -70,13 +88,20 @@ func (this *TimedUserValidator) generateNewHashes(nowSec Timestamp, idx int, ent
 	}
 }
 
-func (this *TimedUserValidator) updateUserHash(tick <-chan time.Time) {
-	for now := range tick {
-		nowSec := Timestamp(now.Unix() + cacheDurationSec)
-		for _, entry := range this.ids {
-			this.generateNewHashes(nowSec, entry.userIdx, entry)
+func (this *TimedUserValidator) updateUserHash(tick <-chan time.Time, cancel *signal.CancelSignal) {
+L:
+	for {
+		select {
+		case now := <-tick:
+			nowSec := Timestamp(now.Unix() + cacheDurationSec)
+			for _, entry := range this.ids {
+				this.generateNewHashes(nowSec, entry.userIdx, entry)
+			}
+		case <-cancel.WaitForCancel():
+			break L
 		}
 	}
+	cancel.Done()
 }
 
 func (this *TimedUserValidator) Add(user *User) error {

+ 29 - 0
common/signal/close.go

@@ -0,0 +1,29 @@
+package signal
+
+type CancelSignal struct {
+	cancel chan struct{}
+	done   chan struct{}
+}
+
+func NewCloseSignal() *CancelSignal {
+	return &CancelSignal{
+		cancel: make(chan struct{}),
+		done:   make(chan struct{}),
+	}
+}
+
+func (this *CancelSignal) Cancel() {
+	close(this.cancel)
+}
+
+func (this *CancelSignal) WaitForCancel() <-chan struct{} {
+	return this.cancel
+}
+
+func (this *CancelSignal) Done() {
+	close(this.done)
+}
+
+func (this *CancelSignal) WaitForDone() <-chan struct{} {
+	return this.done
+}

+ 1 - 0
proxy/vmess/inbound/inbound.go

@@ -124,6 +124,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
 	defer reader.Release()
 
 	session := raw.NewServerSession(this.clients)
+	defer session.Release()
 
 	request, err := session.DecodeRequestHeader(reader)
 	if err != nil {