Darien Raymond 8 лет назад
Родитель
Сommit
e7aa920c27
1 измененных файлов с 37 добавлено и 40 удалено
  1. 37 40
      app/proxyman/inbound/dynamic.go

+ 37 - 40
app/proxyman/inbound/dynamic.go

@@ -12,27 +12,16 @@ import (
 	"v2ray.com/core/proxy"
 )
 
-type workerWithContext struct {
-	ctx    context.Context
-	cancel context.CancelFunc
-	worker worker
-}
-
-func (w *workerWithContext) Close() {
-	w.cancel()
-	w.worker.Close()
-}
-
 type DynamicInboundHandler struct {
-	sync.Mutex
 	tag            string
 	ctx            context.Context
 	cancel         context.CancelFunc
 	proxyConfig    interface{}
 	receiverConfig *proxyman.ReceiverConfig
+	portMutex      sync.Mutex
 	portsInUse     map[v2net.Port]bool
-	worker         []*workerWithContext
-	worker2Recycle []*workerWithContext
+	workerMutex    sync.RWMutex
+	worker         []worker
 	lastRefresh    time.Time
 }
 
@@ -53,8 +42,9 @@ func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *p
 func (h *DynamicInboundHandler) allocatePort() v2net.Port {
 	from := int(h.receiverConfig.PortRange.From)
 	delta := int(h.receiverConfig.PortRange.To) - from + 1
-	h.Lock()
-	defer h.Unlock()
+
+	h.portMutex.Lock()
+	defer h.portMutex.Unlock()
 
 	for {
 		r := dice.Roll(delta)
@@ -67,30 +57,34 @@ func (h *DynamicInboundHandler) allocatePort() v2net.Port {
 	}
 }
 
-func (h *DynamicInboundHandler) refresh() error {
-	h.lastRefresh = time.Now()
-
-	ports2Del := make([]v2net.Port, 0, 16)
-	for _, worker := range h.worker2Recycle {
+func (h *DynamicInboundHandler) waitAnyCloseWorkers(ctx context.Context, cancel context.CancelFunc, workers []worker, duration time.Duration) {
+	time.Sleep(duration)
+	cancel()
+	ports2Del := make([]v2net.Port, len(workers))
+	for idx, worker := range workers {
+		ports2Del[idx] = worker.Port()
 		worker.Close()
-		ports2Del = append(ports2Del, worker.worker.Port())
 	}
 
-	h.Lock()
+	h.portMutex.Lock()
 	for _, port := range ports2Del {
 		delete(h.portsInUse, port)
 	}
-	h.Unlock()
+	h.portMutex.Unlock()
+}
+
+func (h *DynamicInboundHandler) refresh() error {
+	h.lastRefresh = time.Now()
 
-	h.worker2Recycle, h.worker = h.worker, h.worker2Recycle[:0]
+	timeout := time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue())
+	ctx, cancel := context.WithTimeout(h.ctx, timeout)
+	workers := make([]worker, 0, h.receiverConfig.AllocationStrategy.GetConcurrencyValue())
 
 	address := h.receiverConfig.Listen.AsAddress()
 	if address == nil {
 		address = v2net.AnyIP
 	}
 	for i := uint32(0); i < h.receiverConfig.AllocationStrategy.GetConcurrencyValue(); i++ {
-		ctx, cancel := context.WithCancel(h.ctx)
-
 		port := h.allocatePort()
 		p, err := proxy.CreateInboundHandler(ctx, h.proxyConfig)
 		if err != nil {
@@ -109,13 +103,10 @@ func (h *DynamicInboundHandler) refresh() error {
 				allowPassiveConn: h.receiverConfig.AllowPassiveConnection,
 			}
 			if err := worker.Start(); err != nil {
-				return err
+				log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err)
+				continue
 			}
-			h.worker = append(h.worker, &workerWithContext{
-				ctx:    ctx,
-				cancel: cancel,
-				worker: worker,
-			})
+			workers = append(workers, worker)
 		}
 
 		if nl.HasNetwork(v2net.Network_UDP) {
@@ -127,16 +118,19 @@ func (h *DynamicInboundHandler) refresh() error {
 				recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
 			}
 			if err := worker.Start(); err != nil {
-				return err
+				log.Warning("Proxyman:InboundHandler: Failed to create UDP worker: ", err)
+				continue
 			}
-			h.worker = append(h.worker, &workerWithContext{
-				ctx:    ctx,
-				cancel: cancel,
-				worker: worker,
-			})
+			workers = append(workers, worker)
 		}
 	}
 
+	h.workerMutex.Lock()
+	h.worker = workers
+	h.workerMutex.Unlock()
+
+	go h.waitAnyCloseWorkers(ctx, cancel, workers, timeout)
+
 	return nil
 }
 
@@ -162,7 +156,10 @@ func (h *DynamicInboundHandler) Close() {
 }
 
 func (h *DynamicInboundHandler) GetRandomInboundProxy() (proxy.Inbound, v2net.Port, int) {
+	h.workerMutex.RLock()
+	defer h.workerMutex.RUnlock()
+
 	w := h.worker[dice.Roll(len(h.worker))]
 	expire := h.receiverConfig.AllocationStrategy.GetRefreshValue() - uint32(time.Since(h.lastRefresh)/time.Minute)
-	return w.worker.Proxy(), w.worker.Port(), int(expire)
+	return w.Proxy(), w.Port(), int(expire)
 }