| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package router
- import (
- "fmt"
- "strings"
- sync "sync"
- "time"
- "github.com/v2fly/v2ray-core/v4/common/dice"
- "github.com/v2fly/v2ray-core/v4/features/routing"
- )
- // HealthPingSettings holds settings for health Checker
- type HealthPingSettings struct {
- Destination string `json:"destination"`
- Connectivity string `json:"connectivity"`
- Interval time.Duration `json:"interval"`
- SamplingCount int `json:"sampling"`
- Timeout time.Duration `json:"timeout"`
- }
- // HealthPing is the health checker for balancers
- type HealthPing struct {
- access sync.Mutex
- ticker *time.Ticker
- dispatcher routing.Dispatcher
- Settings *HealthPingSettings
- Results map[string]*HealthPingRTTS
- }
- // NewHealthPing creates a new HealthPing with settings
- func NewHealthPing(config *HealthPingConfig, dispatcher routing.Dispatcher) *HealthPing {
- settings := &HealthPingSettings{}
- if config != nil {
- settings = &HealthPingSettings{
- Connectivity: strings.TrimSpace(config.Connectivity),
- Destination: strings.TrimSpace(config.Destination),
- Interval: time.Duration(config.Interval),
- SamplingCount: int(config.SamplingCount),
- Timeout: time.Duration(config.Timeout),
- }
- }
- if settings.Destination == "" {
- settings.Destination = "http://www.google.com/gen_204"
- }
- if settings.Interval == 0 {
- settings.Interval = time.Duration(1) * time.Minute
- } else if settings.Interval < 10 {
- newError("health check interval is too small, 10s is applied").AtWarning().WriteToLog()
- settings.Interval = time.Duration(10) * time.Second
- }
- if settings.SamplingCount <= 0 {
- settings.SamplingCount = 10
- }
- if settings.Timeout <= 0 {
- // results are saved after all health pings finish,
- // a larger timeout could possibly makes checks run longer
- settings.Timeout = time.Duration(5) * time.Second
- }
- return &HealthPing{
- dispatcher: dispatcher,
- Settings: settings,
- Results: nil,
- }
- }
- // StartScheduler implements the HealthChecker
- func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
- if h.ticker != nil {
- return
- }
- interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
- ticker := time.NewTicker(interval)
- h.ticker = ticker
- go func() {
- for {
- go func() {
- tags, err := selector()
- if err != nil {
- newError("error select outbounds for scheduled health check: ", err).AtWarning().WriteToLog()
- return
- }
- h.doCheck(tags, interval, h.Settings.SamplingCount)
- h.Cleanup(tags)
- }()
- _, ok := <-ticker.C
- if !ok {
- break
- }
- }
- }()
- }
- // StopScheduler implements the HealthChecker
- func (h *HealthPing) StopScheduler() {
- h.ticker.Stop()
- h.ticker = nil
- }
- // Check implements the HealthChecker
- func (h *HealthPing) Check(tags []string) error {
- if len(tags) == 0 {
- return nil
- }
- newError("perform one-time health check for tags ", tags).AtInfo().WriteToLog()
- h.doCheck(tags, 0, 1)
- return nil
- }
- type rtt struct {
- handler string
- value time.Duration
- }
- // doCheck performs the 'rounds' amount checks in given 'duration'. You should make
- // sure all tags are valid for current balancer
- func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
- count := len(tags) * rounds
- if count == 0 {
- return
- }
- ch := make(chan *rtt, count)
- // rtts := make(map[string][]time.Duration)
- for _, tag := range tags {
- handler := tag
- client := newPingClient(
- h.Settings.Destination,
- h.Settings.Timeout,
- handler,
- h.dispatcher,
- )
- for i := 0; i < rounds; i++ {
- delay := time.Duration(0)
- if duration > 0 {
- delay = time.Duration(dice.Roll(int(duration)))
- }
- time.AfterFunc(delay, func() {
- newError("checking ", handler).AtDebug().WriteToLog()
- delay, err := client.MeasureDelay()
- if err == nil {
- ch <- &rtt{
- handler: handler,
- value: delay,
- }
- return
- }
- if !h.checkConnectivity() {
- newError("network is down").AtWarning().WriteToLog()
- ch <- &rtt{
- handler: handler,
- value: 0,
- }
- return
- }
- newError(fmt.Sprintf(
- "error ping %s with %s: %s",
- h.Settings.Destination,
- handler,
- err,
- )).AtWarning().WriteToLog()
- ch <- &rtt{
- handler: handler,
- value: rttFailed,
- }
- })
- }
- }
- for i := 0; i < count; i++ {
- rtt := <-ch
- if rtt.value > 0 {
- // should not put results when network is down
- h.PutResult(rtt.handler, rtt.value)
- }
- }
- }
- // PutResult put a ping rtt to results
- func (h *HealthPing) PutResult(tag string, rtt time.Duration) {
- h.access.Lock()
- defer h.access.Unlock()
- if h.Results == nil {
- h.Results = make(map[string]*HealthPingRTTS)
- }
- r, ok := h.Results[tag]
- if !ok {
- // validity is 2 times to sampling period, since the check are
- // distributed in the time line randomly, in extreme cases,
- // previous checks are distributed on the left, and latters
- // on the right
- validity := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) * 2
- r = NewHealthPingResult(h.Settings.SamplingCount, validity)
- h.Results[tag] = r
- }
- r.Put(rtt)
- }
- // Cleanup removes results of removed handlers,
- // tags should be all valid tags of the Balancer now
- func (h *HealthPing) Cleanup(tags []string) {
- h.access.Lock()
- defer h.access.Unlock()
- for tag := range h.Results {
- found := false
- for _, v := range tags {
- if tag == v {
- found = true
- break
- }
- }
- if !found {
- delete(h.Results, tag)
- }
- }
- }
- // checkConnectivity checks the network connectivity, it returns
- // true if network is good or "connectivity check url" not set
- func (h *HealthPing) checkConnectivity() bool {
- if h.Settings.Connectivity == "" {
- return true
- }
- tester := newDirectPingClient(
- h.Settings.Connectivity,
- h.Settings.Timeout,
- )
- if _, err := tester.MeasureDelay(); err != nil {
- return false
- }
- return true
- }
|