healthping.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package router
  2. import (
  3. "fmt"
  4. "strings"
  5. sync "sync"
  6. "time"
  7. "github.com/v2fly/v2ray-core/v4/common/dice"
  8. "github.com/v2fly/v2ray-core/v4/features/routing"
  9. )
  10. // HealthPingSettings holds settings for health Checker
  11. type HealthPingSettings struct {
  12. Destination string `json:"destination"`
  13. Connectivity string `json:"connectivity"`
  14. Interval time.Duration `json:"interval"`
  15. SamplingCount int `json:"sampling"`
  16. Timeout time.Duration `json:"timeout"`
  17. }
  18. // HealthPing is the health checker for balancers
  19. type HealthPing struct {
  20. access sync.Mutex
  21. ticker *time.Ticker
  22. dispatcher routing.Dispatcher
  23. Settings *HealthPingSettings
  24. Results map[string]*HealthPingRTTS
  25. }
  26. // NewHealthPing creates a new HealthPing with settings
  27. func NewHealthPing(config *HealthPingConfig, dispatcher routing.Dispatcher) *HealthPing {
  28. settings := &HealthPingSettings{}
  29. if config != nil {
  30. settings = &HealthPingSettings{
  31. Connectivity: strings.TrimSpace(config.Connectivity),
  32. Destination: strings.TrimSpace(config.Destination),
  33. Interval: time.Duration(config.Interval),
  34. SamplingCount: int(config.SamplingCount),
  35. Timeout: time.Duration(config.Timeout),
  36. }
  37. }
  38. if settings.Destination == "" {
  39. settings.Destination = "http://www.google.com/gen_204"
  40. }
  41. if settings.Interval == 0 {
  42. settings.Interval = time.Duration(1) * time.Minute
  43. } else if settings.Interval < 10 {
  44. newError("health check interval is too small, 10s is applied").AtWarning().WriteToLog()
  45. settings.Interval = time.Duration(10) * time.Second
  46. }
  47. if settings.SamplingCount <= 0 {
  48. settings.SamplingCount = 10
  49. }
  50. if settings.Timeout <= 0 {
  51. // results are saved after all health pings finish,
  52. // a larger timeout could possibly makes checks run longer
  53. settings.Timeout = time.Duration(5) * time.Second
  54. }
  55. return &HealthPing{
  56. dispatcher: dispatcher,
  57. Settings: settings,
  58. Results: nil,
  59. }
  60. }
  61. // StartScheduler implements the HealthChecker
  62. func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
  63. if h.ticker != nil {
  64. return
  65. }
  66. interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
  67. ticker := time.NewTicker(interval)
  68. h.ticker = ticker
  69. go func() {
  70. for {
  71. go func() {
  72. tags, err := selector()
  73. if err != nil {
  74. newError("error select outbounds for scheduled health check: ", err).AtWarning().WriteToLog()
  75. return
  76. }
  77. h.doCheck(tags, interval, h.Settings.SamplingCount)
  78. h.Cleanup(tags)
  79. }()
  80. _, ok := <-ticker.C
  81. if !ok {
  82. break
  83. }
  84. }
  85. }()
  86. }
  87. // StopScheduler implements the HealthChecker
  88. func (h *HealthPing) StopScheduler() {
  89. h.ticker.Stop()
  90. h.ticker = nil
  91. }
  92. // Check implements the HealthChecker
  93. func (h *HealthPing) Check(tags []string) error {
  94. if len(tags) == 0 {
  95. return nil
  96. }
  97. newError("perform one-time health check for tags ", tags).AtInfo().WriteToLog()
  98. h.doCheck(tags, 0, 1)
  99. return nil
  100. }
  101. type rtt struct {
  102. handler string
  103. value time.Duration
  104. }
  105. // doCheck performs the 'rounds' amount checks in given 'duration'. You should make
  106. // sure all tags are valid for current balancer
  107. func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
  108. count := len(tags) * rounds
  109. if count == 0 {
  110. return
  111. }
  112. ch := make(chan *rtt, count)
  113. // rtts := make(map[string][]time.Duration)
  114. for _, tag := range tags {
  115. handler := tag
  116. client := newPingClient(
  117. h.Settings.Destination,
  118. h.Settings.Timeout,
  119. handler,
  120. h.dispatcher,
  121. )
  122. for i := 0; i < rounds; i++ {
  123. delay := time.Duration(0)
  124. if duration > 0 {
  125. delay = time.Duration(dice.Roll(int(duration)))
  126. }
  127. time.AfterFunc(delay, func() {
  128. newError("checking ", handler).AtDebug().WriteToLog()
  129. delay, err := client.MeasureDelay()
  130. if err == nil {
  131. ch <- &rtt{
  132. handler: handler,
  133. value: delay,
  134. }
  135. return
  136. }
  137. if !h.checkConnectivity() {
  138. newError("network is down").AtWarning().WriteToLog()
  139. ch <- &rtt{
  140. handler: handler,
  141. value: 0,
  142. }
  143. return
  144. }
  145. newError(fmt.Sprintf(
  146. "error ping %s with %s: %s",
  147. h.Settings.Destination,
  148. handler,
  149. err,
  150. )).AtWarning().WriteToLog()
  151. ch <- &rtt{
  152. handler: handler,
  153. value: rttFailed,
  154. }
  155. })
  156. }
  157. }
  158. for i := 0; i < count; i++ {
  159. rtt := <-ch
  160. if rtt.value > 0 {
  161. // should not put results when network is down
  162. h.PutResult(rtt.handler, rtt.value)
  163. }
  164. }
  165. }
  166. // PutResult put a ping rtt to results
  167. func (h *HealthPing) PutResult(tag string, rtt time.Duration) {
  168. h.access.Lock()
  169. defer h.access.Unlock()
  170. if h.Results == nil {
  171. h.Results = make(map[string]*HealthPingRTTS)
  172. }
  173. r, ok := h.Results[tag]
  174. if !ok {
  175. // validity is 2 times to sampling period, since the check are
  176. // distributed in the time line randomly, in extreme cases,
  177. // previous checks are distributed on the left, and latters
  178. // on the right
  179. validity := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) * 2
  180. r = NewHealthPingResult(h.Settings.SamplingCount, validity)
  181. h.Results[tag] = r
  182. }
  183. r.Put(rtt)
  184. }
  185. // Cleanup removes results of removed handlers,
  186. // tags should be all valid tags of the Balancer now
  187. func (h *HealthPing) Cleanup(tags []string) {
  188. h.access.Lock()
  189. defer h.access.Unlock()
  190. for tag := range h.Results {
  191. found := false
  192. for _, v := range tags {
  193. if tag == v {
  194. found = true
  195. break
  196. }
  197. }
  198. if !found {
  199. delete(h.Results, tag)
  200. }
  201. }
  202. }
  203. // checkConnectivity checks the network connectivity, it returns
  204. // true if network is good or "connectivity check url" not set
  205. func (h *HealthPing) checkConnectivity() bool {
  206. if h.Settings.Connectivity == "" {
  207. return true
  208. }
  209. tester := newDirectPingClient(
  210. h.Settings.Connectivity,
  211. h.Settings.Timeout,
  212. )
  213. if _, err := tester.MeasureDelay(); err != nil {
  214. return false
  215. }
  216. return true
  217. }