healthping.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package burst
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. sync "sync"
  7. "time"
  8. "github.com/v2fly/v2ray-core/v5/common/dice"
  9. )
  10. // HealthPingSettings holds settings for health Checker
  11. type HealthPingSettings struct {
  12. Destination string `json:"destination"`
  13. Connectivity string `json:"connectivity"`
  14. Interval time.Duration `json:"interval"`
  15. SamplingCount int `json:"sampling"`
  16. Timeout time.Duration `json:"timeout"`
  17. }
  18. // HealthPing is the health checker for balancers
  19. type HealthPing struct {
  20. ctx context.Context
  21. access sync.Mutex
  22. ticker *time.Ticker
  23. Settings *HealthPingSettings
  24. Results map[string]*HealthPingRTTS
  25. }
  26. // NewHealthPing creates a new HealthPing with settings
  27. func NewHealthPing(ctx context.Context, config *HealthPingConfig) *HealthPing {
  28. settings := &HealthPingSettings{}
  29. if config != nil {
  30. settings = &HealthPingSettings{
  31. Connectivity: strings.TrimSpace(config.Connectivity),
  32. Destination: strings.TrimSpace(config.Destination),
  33. Interval: time.Duration(config.Interval),
  34. SamplingCount: int(config.SamplingCount),
  35. Timeout: time.Duration(config.Timeout),
  36. }
  37. }
  38. if settings.Destination == "" {
  39. settings.Destination = "http://www.google.com/gen_204"
  40. }
  41. if settings.Interval == 0 {
  42. settings.Interval = time.Duration(1) * time.Minute
  43. } else if settings.Interval < 10 {
  44. newError("health check interval is too small, 10s is applied").AtWarning().WriteToLog()
  45. settings.Interval = time.Duration(10) * time.Second
  46. }
  47. if settings.SamplingCount <= 0 {
  48. settings.SamplingCount = 10
  49. }
  50. if settings.Timeout <= 0 {
  51. // results are saved after all health pings finish,
  52. // a larger timeout could possibly makes checks run longer
  53. settings.Timeout = time.Duration(5) * time.Second
  54. }
  55. return &HealthPing{
  56. ctx: ctx,
  57. Settings: settings,
  58. Results: nil,
  59. }
  60. }
  61. // StartScheduler implements the HealthChecker
  62. func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
  63. if h.ticker != nil {
  64. return
  65. }
  66. interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
  67. ticker := time.NewTicker(interval)
  68. h.ticker = ticker
  69. go func() {
  70. for {
  71. go func() {
  72. tags, err := selector()
  73. if err != nil {
  74. newError("error select outbounds for scheduled health check: ", err).AtWarning().WriteToLog()
  75. return
  76. }
  77. h.doCheck(tags, interval, h.Settings.SamplingCount)
  78. h.Cleanup(tags)
  79. }()
  80. _, ok := <-ticker.C
  81. if !ok {
  82. break
  83. }
  84. }
  85. }()
  86. }
  87. // StopScheduler implements the HealthChecker
  88. func (h *HealthPing) StopScheduler() {
  89. h.ticker.Stop()
  90. h.ticker = nil
  91. }
  92. // Check implements the HealthChecker
  93. func (h *HealthPing) Check(tags []string) error {
  94. if len(tags) == 0 {
  95. return nil
  96. }
  97. newError("perform one-time health check for tags ", tags).AtInfo().WriteToLog()
  98. h.doCheck(tags, 0, 1)
  99. return nil
  100. }
  101. type rtt struct {
  102. handler string
  103. value time.Duration
  104. }
  105. // doCheck performs the 'rounds' amount checks in given 'duration'. You should make
  106. // sure all tags are valid for current balancer
  107. func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
  108. count := len(tags) * rounds
  109. if count == 0 {
  110. return
  111. }
  112. ch := make(chan *rtt, count)
  113. for _, tag := range tags {
  114. handler := tag
  115. client := newPingClient(
  116. h.ctx,
  117. h.Settings.Destination,
  118. h.Settings.Timeout,
  119. handler,
  120. )
  121. for i := 0; i < rounds; i++ {
  122. delay := time.Duration(0)
  123. if duration > 0 {
  124. delay = time.Duration(dice.Roll(int(duration)))
  125. }
  126. time.AfterFunc(delay, func() {
  127. newError("checking ", handler).AtDebug().WriteToLog()
  128. delay, err := client.MeasureDelay()
  129. if err == nil {
  130. ch <- &rtt{
  131. handler: handler,
  132. value: delay,
  133. }
  134. return
  135. }
  136. if !h.checkConnectivity() {
  137. newError("network is down").AtWarning().WriteToLog()
  138. ch <- &rtt{
  139. handler: handler,
  140. value: 0,
  141. }
  142. return
  143. }
  144. newError(fmt.Sprintf(
  145. "error ping %s with %s: %s",
  146. h.Settings.Destination,
  147. handler,
  148. err,
  149. )).AtWarning().WriteToLog()
  150. ch <- &rtt{
  151. handler: handler,
  152. value: rttFailed,
  153. }
  154. })
  155. }
  156. }
  157. for i := 0; i < count; i++ {
  158. rtt := <-ch
  159. if rtt.value > 0 {
  160. // should not put results when network is down
  161. h.PutResult(rtt.handler, rtt.value)
  162. }
  163. }
  164. }
  165. // PutResult puts a ping rtt to results
  166. func (h *HealthPing) PutResult(tag string, rtt time.Duration) {
  167. h.access.Lock()
  168. defer h.access.Unlock()
  169. if h.Results == nil {
  170. h.Results = make(map[string]*HealthPingRTTS)
  171. }
  172. r, ok := h.Results[tag]
  173. if !ok {
  174. // validity is 2 times to sampling period, since the check are
  175. // distributed in the time line randomly, in extreme cases,
  176. // previous checks are distributed on the left, and latters
  177. // on the right
  178. validity := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) * 2
  179. r = NewHealthPingResult(h.Settings.SamplingCount, validity)
  180. h.Results[tag] = r
  181. }
  182. r.Put(rtt)
  183. }
  184. // Cleanup removes results of removed handlers,
  185. // tags should be all valid tags of the Balancer now
  186. func (h *HealthPing) Cleanup(tags []string) {
  187. h.access.Lock()
  188. defer h.access.Unlock()
  189. for tag := range h.Results {
  190. found := false
  191. for _, v := range tags {
  192. if tag == v {
  193. found = true
  194. break
  195. }
  196. }
  197. if !found {
  198. delete(h.Results, tag)
  199. }
  200. }
  201. }
  202. // checkConnectivity checks the network connectivity, it returns
  203. // true if network is good or "connectivity check url" not set
  204. func (h *HealthPing) checkConnectivity() bool {
  205. if h.Settings.Connectivity == "" {
  206. return true
  207. }
  208. tester := newDirectPingClient(
  209. h.Settings.Connectivity,
  210. h.Settings.Timeout,
  211. )
  212. if _, err := tester.MeasureDelay(); err != nil {
  213. return false
  214. }
  215. return true
  216. }