subdocapplier.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package subscriptionmanager
  2. import (
  3. "fmt"
  4. core "github.com/v2fly/v2ray-core/v5"
  5. "github.com/v2fly/v2ray-core/v5/app/subscription/specs"
  6. )
  7. func (s *SubscriptionManagerImpl) applySubscriptionTo(name string, document *specs.SubscriptionDocument) error {
  8. var trackedSub *trackedSubscription
  9. if trackedSubFound, found := s.trackedSubscriptions[name]; !found {
  10. return newError("not found")
  11. } else {
  12. trackedSub = trackedSubFound
  13. }
  14. delta, err := trackedSub.diff(document)
  15. if err != nil {
  16. return err
  17. }
  18. nameToServerConfig := make(map[string]*specs.SubscriptionServerConfig)
  19. for _, server := range document.Server {
  20. nameToServerConfig[server.Id] = server
  21. }
  22. for _, serverName := range delta.removed {
  23. if err := s.removeManagedServer(name, serverName); err != nil {
  24. newError("failed to remove managed server: ", err).AtWarning().WriteToLog()
  25. continue
  26. }
  27. trackedSub.recordRemovedServer(serverName)
  28. }
  29. for _, serverName := range delta.modified {
  30. serverConfig := nameToServerConfig[serverName]
  31. if err := s.updateManagedServer(name, serverName, serverConfig); err != nil {
  32. newError("failed to update managed server: ", err).AtWarning().WriteToLog()
  33. continue
  34. }
  35. trackedSub.recordUpdatedServer(serverName, serverConfig.Metadata[ServerMetadataTagName], serverConfig)
  36. }
  37. for _, serverName := range delta.added {
  38. serverConfig := nameToServerConfig[serverName]
  39. if err := s.addManagedServer(name, serverName, serverConfig); err != nil {
  40. newError("failed to add managed server: ", err).AtWarning().WriteToLog()
  41. continue
  42. }
  43. trackedSub.recordUpdatedServer(serverName, serverConfig.Metadata[ServerMetadataTagName], serverConfig)
  44. }
  45. newError("finished applying subscription, ", name, "; ", fmt.Sprintf(
  46. "%v updated, %v added, %v removed, %v unchanged",
  47. len(delta.modified), len(delta.added), len(delta.removed), len(delta.unchanged))).AtInfo().WriteToLog()
  48. return nil
  49. }
  50. func (s *SubscriptionManagerImpl) removeManagedServer(subscriptionName, serverName string) error {
  51. var trackedSub *trackedSubscription
  52. if trackedSubFound, found := s.trackedSubscriptions[subscriptionName]; !found {
  53. return newError("not found")
  54. } else {
  55. trackedSub = trackedSubFound
  56. }
  57. var trackedServer *materializedServer
  58. if trackedServerFound, err := trackedSub.getCurrentServer(serverName); err != nil {
  59. return err
  60. } else {
  61. trackedServer = trackedServerFound
  62. }
  63. tagName := fmt.Sprintf("%s_%s", trackedSub.importSource.TagPrefix, trackedServer.tagPostfix)
  64. if err := core.RemoveOutboundHandler(s.s, tagName); err != nil {
  65. return newError("failed to remove handler: ", err)
  66. }
  67. trackedSub.recordRemovedServer(serverName)
  68. return nil
  69. }
  70. func (s *SubscriptionManagerImpl) addManagedServer(subscriptionName, serverName string,
  71. serverSpec *specs.SubscriptionServerConfig,
  72. ) error {
  73. var trackedSub *trackedSubscription
  74. if trackedSubFound, found := s.trackedSubscriptions[subscriptionName]; !found {
  75. return newError("not found")
  76. } else {
  77. trackedSub = trackedSubFound
  78. }
  79. tagPostfix := serverSpec.Metadata[ServerMetadataTagName]
  80. tagName := fmt.Sprintf("%s_%s", trackedSub.importSource.TagPrefix, tagPostfix)
  81. materialized, err := s.materialize(subscriptionName, tagName, serverSpec)
  82. if err != nil {
  83. return newError("failed to materialize server: ", err)
  84. }
  85. if err := core.AddOutboundHandler(s.s, materialized); err != nil {
  86. return newError("failed to add handler: ", err)
  87. }
  88. trackedSub.recordUpdatedServer(serverName, tagPostfix, serverSpec)
  89. return nil
  90. }
  91. func (s *SubscriptionManagerImpl) updateManagedServer(subscriptionName, serverName string,
  92. serverSpec *specs.SubscriptionServerConfig,
  93. ) error {
  94. if err := s.removeManagedServer(subscriptionName, serverName); err != nil {
  95. return newError("failed to update managed server: ", err).AtWarning()
  96. }
  97. if err := s.addManagedServer(subscriptionName, serverName, serverSpec); err != nil {
  98. return newError("failed to update managed server : ", err).AtWarning()
  99. }
  100. return nil
  101. }