Browse Source

Fix ticker usage

ticker.Close does not close ticker.C
世界 3 years ago
parent
commit
ebee459f1f
2 changed files with 25 additions and 21 deletions
  1. 8 14
      app/log/command/command.go
  2. 17 7
      app/observatory/burst/healthping.go

+ 8 - 14
app/log/command/command.go

@@ -4,14 +4,11 @@ package command
 
 import (
 	"context"
-	"time"
-
-	grpc "google.golang.org/grpc"
-
 	core "github.com/v2fly/v2ray-core/v5"
 	"github.com/v2fly/v2ray-core/v5/app/log"
 	"github.com/v2fly/v2ray-core/v5/common"
 	cmlog "github.com/v2fly/v2ray-core/v5/common/log"
+	grpc "google.golang.org/grpc"
 )
 
 // LoggerServer is the implemention of LoggerService
@@ -44,22 +41,19 @@ func (s *LoggerServer) FollowLog(_ *FollowLogRequest, stream LoggerService_Follo
 	if !ok {
 		return newError("logger not support following")
 	}
-	var err error
+	done := make(chan struct{})
 	f := func(msg cmlog.Message) {
-		err = stream.Send(&FollowLogResponse{
+		err := stream.Send(&FollowLogResponse{
 			Message: msg.String(),
 		})
-	}
-	follower.AddFollower(f)
-	defer follower.RemoveFollower(f)
-	ticker := time.NewTicker(time.Second)
-	for {
-		<-ticker.C
 		if err != nil {
-			ticker.Stop()
-			return nil
+			close(done)
 		}
 	}
+	follower.AddFollower(f)
+	defer follower.RemoveFollower(f)
+	<-done
+	return nil
 }
 
 func (s *LoggerServer) mustEmbedUnimplementedLoggerServiceServer() {}

+ 17 - 7
app/observatory/burst/healthping.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"fmt"
 	"strings"
-	sync "sync"
+	"sync"
 	"time"
 
 	"github.com/v2fly/v2ray-core/v5/common/dice"
@@ -21,9 +21,10 @@ type HealthPingSettings struct {
 
 // HealthPing is the health checker for balancers
 type HealthPing struct {
-	ctx    context.Context
-	access sync.Mutex
-	ticker *time.Ticker
+	ctx         context.Context
+	access      sync.Mutex
+	ticker      *time.Ticker
+	tickerClose chan struct{}
 
 	Settings *HealthPingSettings
 	Results  map[string]*HealthPingRTTS
@@ -72,7 +73,9 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
 	}
 	interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
 	ticker := time.NewTicker(interval)
+	tickerClose := make(chan struct{})
 	h.ticker = ticker
+	h.tickerClose = tickerClose
 	go func() {
 		for {
 			go func() {
@@ -84,9 +87,11 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
 				h.doCheck(tags, interval, h.Settings.SamplingCount)
 				h.Cleanup(tags)
 			}()
-			_, ok := <-ticker.C
-			if !ok {
-				break
+			select {
+			case <-ticker.C:
+				continue
+			case <-tickerClose:
+				return
 			}
 		}
 	}()
@@ -94,8 +99,13 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
 
 // StopScheduler implements the HealthChecker
 func (h *HealthPing) StopScheduler() {
+	if h.ticker == nil {
+		return
+	}
 	h.ticker.Stop()
 	h.ticker = nil
+	close(h.tickerClose)
+	h.tickerClose = nil
 }
 
 // Check implements the HealthChecker