burstobserver.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package burst
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/golang/protobuf/proto"
  6. core "github.com/v2fly/v2ray-core/v5"
  7. "github.com/v2fly/v2ray-core/v5/app/observatory"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/common/signal/done"
  10. "github.com/v2fly/v2ray-core/v5/features/extension"
  11. "github.com/v2fly/v2ray-core/v5/features/outbound"
  12. )
  13. type Observer struct {
  14. config *Config
  15. ctx context.Context
  16. statusLock sync.Mutex // nolint: structcheck
  17. hp *HealthPing
  18. finished *done.Instance
  19. ohm outbound.Manager
  20. }
  21. func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
  22. return &observatory.ObservationResult{Status: o.createResult()}, nil
  23. }
  24. func (o *Observer) createResult() []*observatory.OutboundStatus {
  25. var result []*observatory.OutboundStatus
  26. o.hp.access.Lock()
  27. defer o.hp.access.Unlock()
  28. for name, value := range o.hp.Results {
  29. status := observatory.OutboundStatus{
  30. Alive: value.getStatistics().All != value.getStatistics().Fail,
  31. Delay: value.getStatistics().Average.Milliseconds(),
  32. LastErrorReason: "",
  33. OutboundTag: name,
  34. LastSeenTime: 0,
  35. LastTryTime: 0,
  36. HealthPing: &observatory.HealthPingMeasurementResult{
  37. All: int64(value.getStatistics().All),
  38. Fail: int64(value.getStatistics().Fail),
  39. Deviation: int64(value.getStatistics().Deviation),
  40. Average: int64(value.getStatistics().Average),
  41. Max: int64(value.getStatistics().Max),
  42. Min: int64(value.getStatistics().Min),
  43. },
  44. }
  45. result = append(result, &status)
  46. }
  47. return result
  48. }
  49. func (o *Observer) Type() interface{} {
  50. return extension.ObservatoryType()
  51. }
  52. func (o *Observer) Start() error {
  53. if o.config != nil && len(o.config.SubjectSelector) != 0 {
  54. o.finished = done.New()
  55. o.hp.StartScheduler(func() ([]string, error) {
  56. hs, ok := o.ohm.(outbound.HandlerSelector)
  57. if !ok {
  58. return nil, newError("outbound.Manager is not a HandlerSelector")
  59. }
  60. outbounds := hs.Select(o.config.SubjectSelector)
  61. return outbounds, nil
  62. })
  63. }
  64. return nil
  65. }
  66. func (o *Observer) Close() error {
  67. if o.finished != nil {
  68. o.hp.StopScheduler()
  69. return o.finished.Close()
  70. }
  71. return nil
  72. }
  73. func New(ctx context.Context, config *Config) (*Observer, error) {
  74. var outboundManager outbound.Manager
  75. err := core.RequireFeatures(ctx, func(om outbound.Manager) {
  76. outboundManager = om
  77. })
  78. if err != nil {
  79. return nil, newError("Cannot get depended features").Base(err)
  80. }
  81. hp := NewHealthPing(ctx, config.PingConfig)
  82. return &Observer{
  83. config: config,
  84. ctx: ctx,
  85. ohm: outboundManager,
  86. hp: hp,
  87. }, nil
  88. }
  89. func init() {
  90. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  91. return New(ctx, config.(*Config))
  92. }))
  93. }