manager.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package subscriptionmanager
  2. import (
  3. "archive/zip"
  4. "bytes"
  5. "context"
  6. "sync"
  7. "time"
  8. core "github.com/v2fly/v2ray-core/v5"
  9. "github.com/v2fly/v2ray-core/v5/app/subscription"
  10. "github.com/v2fly/v2ray-core/v5/app/subscription/entries"
  11. "github.com/v2fly/v2ray-core/v5/app/subscription/entries/nonnative/nonnativeifce"
  12. "github.com/v2fly/v2ray-core/v5/app/subscription/specs"
  13. "github.com/v2fly/v2ray-core/v5/common"
  14. "github.com/v2fly/v2ray-core/v5/common/task"
  15. )
  16. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  17. type SubscriptionManagerImpl struct {
  18. sync.Mutex
  19. config *subscription.Config
  20. ctx context.Context
  21. s *core.Instance
  22. converter *entries.ConverterRegistry
  23. trackedSubscriptions map[string]*trackedSubscription
  24. refreshTask *task.Periodic
  25. }
  26. func (s *SubscriptionManagerImpl) Type() interface{} {
  27. return subscription.SubscriptionManagerType()
  28. }
  29. func (s *SubscriptionManagerImpl) housekeeping() error {
  30. for subscriptionName := range s.trackedSubscriptions {
  31. s.Lock()
  32. if err := s.checkupSubscription(subscriptionName); err != nil {
  33. newError("failed to checkup subscription: ", err).AtWarning().WriteToLog()
  34. }
  35. s.Unlock()
  36. }
  37. return nil
  38. }
  39. func (s *SubscriptionManagerImpl) Start() error {
  40. go func() {
  41. if err := s.refreshTask.Start(); err != nil {
  42. return
  43. }
  44. }()
  45. return nil
  46. }
  47. func (s *SubscriptionManagerImpl) Close() error {
  48. if err := s.refreshTask.Close(); err != nil {
  49. return err
  50. }
  51. return nil
  52. }
  53. func (s *SubscriptionManagerImpl) addTrackedSubscriptionFromImportSource(importSource *subscription.ImportSource) error {
  54. tracked, err := newTrackedSubscription(importSource)
  55. if err != nil {
  56. return newError("failed to init subscription ", importSource.Name, ": ", err)
  57. }
  58. s.trackedSubscriptions[importSource.Name] = tracked
  59. return nil
  60. }
  61. func (s *SubscriptionManagerImpl) removeTrackedSubscription(subscriptionName string) error {
  62. if _, ok := s.trackedSubscriptions[subscriptionName]; ok {
  63. err := s.applySubscriptionTo(subscriptionName, &specs.SubscriptionDocument{Server: make([]*specs.SubscriptionServerConfig, 0)})
  64. if err != nil {
  65. return newError("failed to apply empty subscription: ", err)
  66. }
  67. delete(s.trackedSubscriptions, subscriptionName)
  68. }
  69. return nil
  70. }
  71. func (s *SubscriptionManagerImpl) init() error {
  72. s.refreshTask = &task.Periodic{
  73. Interval: time.Duration(60) * time.Second,
  74. Execute: s.housekeeping,
  75. }
  76. s.trackedSubscriptions = make(map[string]*trackedSubscription)
  77. s.converter = entries.GetOverlayConverterRegistry()
  78. if s.config.NonnativeConverterOverlay != nil {
  79. zipReader, err := zip.NewReader(bytes.NewReader(s.config.NonnativeConverterOverlay), int64(len(s.config.NonnativeConverterOverlay)))
  80. if err != nil {
  81. return newError("failed to read nonnative converter overlay: ", err)
  82. }
  83. converter, err := nonnativeifce.NewNonNativeConverterConstructor(zipReader)
  84. if err != nil {
  85. return newError("failed to construct nonnative converter: ", err)
  86. }
  87. if err := s.converter.RegisterConverter("user_nonnative", converter); err != nil {
  88. return newError("failed to register user nonnative converter: ", err)
  89. }
  90. }
  91. for _, v := range s.config.Imports {
  92. if err := s.addTrackedSubscriptionFromImportSource(v); err != nil {
  93. return newError("failed to add tracked subscription: ", err)
  94. }
  95. }
  96. return nil
  97. }
  98. func NewSubscriptionManager(ctx context.Context, config *subscription.Config) (*SubscriptionManagerImpl, error) {
  99. instance := core.MustFromContext(ctx)
  100. impl := &SubscriptionManagerImpl{ctx: ctx, s: instance, config: config}
  101. if err := impl.init(); err != nil {
  102. return nil, newError("failed to init subscription manager: ", err)
  103. }
  104. return impl, nil
  105. }
  106. func init() {
  107. common.Must(common.RegisterConfig((*subscription.Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  108. return NewSubscriptionManager(ctx, config.(*subscription.Config))
  109. }))
  110. }