manager.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package subscriptionmanager
  2. import (
  3. "archive/zip"
  4. "bytes"
  5. "context"
  6. "sync"
  7. "time"
  8. core "github.com/v2fly/v2ray-core/v5"
  9. "github.com/v2fly/v2ray-core/v5/app/persistentstorage"
  10. "github.com/v2fly/v2ray-core/v5/app/persistentstorage/protostorage"
  11. "github.com/v2fly/v2ray-core/v5/app/subscription"
  12. "github.com/v2fly/v2ray-core/v5/app/subscription/entries"
  13. "github.com/v2fly/v2ray-core/v5/app/subscription/entries/nonnative/nonnativeifce"
  14. "github.com/v2fly/v2ray-core/v5/app/subscription/specs"
  15. "github.com/v2fly/v2ray-core/v5/common"
  16. "github.com/v2fly/v2ray-core/v5/common/environment"
  17. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  18. "github.com/v2fly/v2ray-core/v5/common/task"
  19. )
  20. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  21. type SubscriptionManagerImpl struct {
  22. sync.Mutex
  23. config *subscription.Config
  24. ctx context.Context
  25. s *core.Instance
  26. converter *entries.ConverterRegistry
  27. trackedSubscriptions map[string]*trackedSubscription
  28. refreshTask *task.Periodic
  29. persistentStorage persistentstorage.ScopedPersistentStorage
  30. persistImportStorage persistentstorage.ScopedPersistentStorage
  31. persistImportSourceProtoStorage protostorage.ProtoPersistentStorage
  32. }
  33. func (s *SubscriptionManagerImpl) Type() interface{} {
  34. return subscription.SubscriptionManagerType()
  35. }
  36. func (s *SubscriptionManagerImpl) housekeeping() error {
  37. for subscriptionName := range s.trackedSubscriptions {
  38. s.Lock()
  39. if err := s.checkupSubscription(subscriptionName); err != nil {
  40. newError("failed to checkup subscription: ", err).AtWarning().WriteToLog()
  41. }
  42. s.Unlock()
  43. }
  44. return nil
  45. }
  46. func (s *SubscriptionManagerImpl) Start() error {
  47. if s.config.Persistence {
  48. appEnvironment := envctx.EnvironmentFromContext(s.ctx).(environment.AppEnvironment)
  49. s.persistentStorage = appEnvironment.PersistentStorage()
  50. importsStorage, err := s.persistentStorage.NarrowScope(s.ctx, []byte("imports"))
  51. if err != nil {
  52. return newError("failed to get persistent storage for imports").Base(err)
  53. }
  54. s.persistImportStorage = importsStorage
  55. s.persistImportSourceProtoStorage = importsStorage.(protostorage.ProtoPersistentStorage)
  56. if err = s.loadAllFromPersistentStorage(); err != nil {
  57. newError("failed to load all from persistent storage: ", err).WriteToLog()
  58. }
  59. }
  60. go func() {
  61. if err := s.refreshTask.Start(); err != nil {
  62. return
  63. }
  64. }()
  65. return nil
  66. }
  67. func (s *SubscriptionManagerImpl) Close() error {
  68. if err := s.refreshTask.Close(); err != nil {
  69. return err
  70. }
  71. return nil
  72. }
  73. func (s *SubscriptionManagerImpl) addTrackedSubscriptionFromImportSource(importSource *subscription.ImportSource,
  74. addedByAPI bool,
  75. ) error {
  76. if s.config.Persistence && addedByAPI {
  77. err := s.persistImportSourceProtoStorage.PutProto(s.ctx, importSource.Name, importSource)
  78. if err != nil {
  79. return newError("failed to persist import source: ", err)
  80. }
  81. }
  82. tracked, err := newTrackedSubscription(importSource)
  83. if err != nil {
  84. return newError("failed to init subscription ", importSource.Name, ": ", err)
  85. }
  86. tracked.addedByAPI = addedByAPI
  87. s.trackedSubscriptions[importSource.Name] = tracked
  88. return nil
  89. }
  90. func (s *SubscriptionManagerImpl) removeTrackedSubscription(subscriptionName string) error {
  91. if s.config.Persistence {
  92. err := s.persistImportStorage.Put(s.ctx, []byte(subscriptionName), nil)
  93. if err != nil {
  94. return newError("failed to delete import source: ", err)
  95. }
  96. }
  97. if _, ok := s.trackedSubscriptions[subscriptionName]; ok {
  98. err := s.applySubscriptionTo(subscriptionName, &specs.SubscriptionDocument{Server: make([]*specs.SubscriptionServerConfig, 0)})
  99. if err != nil {
  100. return newError("failed to apply empty subscription: ", err)
  101. }
  102. delete(s.trackedSubscriptions, subscriptionName)
  103. }
  104. return nil
  105. }
  106. func (s *SubscriptionManagerImpl) init() error {
  107. s.refreshTask = &task.Periodic{
  108. Interval: time.Duration(60) * time.Second,
  109. Execute: s.housekeeping,
  110. }
  111. s.trackedSubscriptions = make(map[string]*trackedSubscription)
  112. s.converter = entries.GetOverlayConverterRegistry()
  113. if s.config.NonnativeConverterOverlay != nil {
  114. zipReader, err := zip.NewReader(bytes.NewReader(s.config.NonnativeConverterOverlay), int64(len(s.config.NonnativeConverterOverlay)))
  115. if err != nil {
  116. return newError("failed to read nonnative converter overlay: ", err)
  117. }
  118. converter, err := nonnativeifce.NewNonNativeConverterConstructor(zipReader)
  119. if err != nil {
  120. return newError("failed to construct nonnative converter: ", err)
  121. }
  122. if err := s.converter.RegisterConverter("user_nonnative", converter); err != nil {
  123. return newError("failed to register user nonnative converter: ", err)
  124. }
  125. }
  126. for _, v := range s.config.Imports {
  127. if err := s.addTrackedSubscriptionFromImportSource(v, false); err != nil {
  128. return newError("failed to add tracked subscription: ", err)
  129. }
  130. }
  131. return nil
  132. }
  133. func (s *SubscriptionManagerImpl) loadAllFromPersistentStorage() error {
  134. if !s.config.Persistence {
  135. return nil
  136. }
  137. protoImportSources, err := s.persistImportStorage.List(s.ctx, []byte(""))
  138. if err != nil {
  139. return newError("failed to list import sources: ", err)
  140. }
  141. for _, protoImportSource := range protoImportSources {
  142. var importSource subscription.ImportSource
  143. err := s.persistImportSourceProtoStorage.GetProto(s.ctx, string(protoImportSource), &importSource)
  144. if err != nil {
  145. return newError("failed to get import source: ", err)
  146. }
  147. if err := s.addTrackedSubscriptionFromImportSource(&importSource, false); err != nil {
  148. return newError("failed to add tracked subscription: ", err)
  149. }
  150. }
  151. return nil
  152. }
  153. func NewSubscriptionManager(ctx context.Context, config *subscription.Config) (*SubscriptionManagerImpl, error) {
  154. instance := core.MustFromContext(ctx)
  155. impl := &SubscriptionManagerImpl{ctx: ctx, s: instance, config: config}
  156. if err := impl.init(); err != nil {
  157. return nil, newError("failed to init subscription manager: ", err)
  158. }
  159. return impl, nil
  160. }
  161. func init() {
  162. common.Must(common.RegisterConfig((*subscription.Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  163. return NewSubscriptionManager(ctx, config.(*subscription.Config))
  164. }))
  165. }