diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java index 11148ef1a59f5..4d32aea255380 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java @@ -214,6 +214,16 @@ default CompletableFuture deleteOffloaded(UUID uid, Map of */ OffloadPolicies getOffloadPolicies(); + + /** + * Update the offload policies of this LedgerOffloader. + * + * @param offloadPolicies the new offload policies + */ + default void updateOffloadPolicies(OffloadPolicies offloadPolicies) { + // default no-op + } + /** * Close the resources if necessary. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 69a38bc50de9d..d3b53512b34b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import java.util.Queue; import java.util.Set; import java.util.TreeSet; @@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; @@ -76,6 +78,8 @@ import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -275,10 +279,48 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled() .updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled()); this.subscriptionPolicies = data.getSubscriptionPolicies(); - + internalUpdateOffloadPolicies(data, null); updateEntryFilters(); } + private void internalUpdateOffloadPolicies(TopicPolicies policies, Policies nsPolicies) { + PulsarService pulsar = brokerService.getPulsar(); + if (!(this instanceof PersistentTopic) || isSystemTopic()) { + return; + } + CompletableFuture> t = brokerService.getTopics().get(topic); + if (!t.isDone() || t.isCompletedExceptionally()) { + return; + } + TopicName topicName = TopicName.get(topic); + Properties brokerCnf = pulsar.getConfig().getProperties(); + CompletableFuture> f = policies == null ? pulsar.getTopicPoliciesService() + .getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.DEFAULT) + : CompletableFuture.completedFuture(Optional.of(policies)) ; + CompletableFuture> f1 = nsPolicies == null ? pulsar.getPulsarResources() + .getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + : CompletableFuture.completedFuture(Optional.of(nsPolicies)); + + CompletableFuture.allOf(f, f1).thenAccept(__ -> { + Optional topicPolicies = f.join(); + Optional namespacePolicies = f1.join(); + OffloadPoliciesImpl nsLevelOffloadPolicies = + (OffloadPoliciesImpl) namespacePolicies.map(p -> p.offload_policies).orElse(null); + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( + topicPolicies.map(TopicPolicies::getOffloadPolicies).orElse(null), + OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, namespacePolicies.orElse(null)), + brokerCnf); + updateOffloadPolicies(offloadPolicies); + }).exceptionally(ex -> { + log.error("[{}] Failed to get offload policies", topic, ex); + return null; + }); + } + + protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) { + // No-op by default + } + protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { if (log.isDebugEnabled()) { log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); @@ -334,7 +376,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue( namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled); - + internalUpdateOffloadPolicies(null, namespacePolicies); updateEntryFilters(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 056fad2a005b4..6cd84ee35b9ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -71,6 +71,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -167,6 +168,8 @@ import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -4273,6 +4276,15 @@ public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() { } } + @Override + protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) { + LedgerOffloader offloader = ledger.getConfig().getLedgerOffloader(); + if (null == offloader || null == offloadPolicies) { + return; + } + offloader.updateOffloadPolicies(offloadPolicies); + } + @Override public void onUpdate(TopicPolicies policies) { if (log.isDebugEnabled()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 6c40aa3f2edd0..a04695c7fc0c9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -346,18 +346,18 @@ public boolean bucketValid() { return false; } - public Properties toProperties() { + public static Properties toProperties(OffloadPoliciesImpl offloadPolicies) { Properties properties = new Properties(); for (Field f : CONFIGURATION_FIELDS) { try { f.setAccessible(true); if ("managedLedgerExtraConfigurations".equals(f.getName())) { - Map extraConfig = (Map) f.get(this); + Map extraConfig = (Map) f.get(offloadPolicies); extraConfig.forEach((key, value) -> { setProperty(properties, EXTRA_CONFIG_PREFIX + key, value); }); } else { - setProperty(properties, f.getName(), f.get(this)); + setProperty(properties, f.getName(), f.get(offloadPolicies)); } } catch (Exception e) { throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e); @@ -366,6 +366,10 @@ public Properties toProperties() { return properties; } + public Properties toProperties() { + return toProperties(this); + } + private static void setProperty(Properties properties, String key, Object value) { if (value != null) { properties.setProperty(key, "" + value); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 2431e61c91cab..eb7ed0231d346 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -66,7 +66,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader { private OrderedScheduler scheduler; private static final long ENTRIES_PER_READ = 100; private OrderedScheduler assignmentScheduler; - private OffloadPolicies offloadPolicies; + private volatile OffloadPolicies offloadPolicies; private final LedgerOffloaderStats offloaderStats; public static boolean driverSupported(String driver) { @@ -394,6 +394,13 @@ public OffloadPolicies getOffloadPolicies() { return offloadPolicies; } + @Override + public void updateOffloadPolicies(OffloadPolicies offloadPolicies) { + if (null != offloadPolicies) { + this.offloadPolicies = offloadPolicies; + } + } + @Override public void close() { if (fileSystem != null) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index b4ed940c9cdca..e6cc8a33dca36 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -97,7 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName"; private final OrderedScheduler scheduler; - private final TieredStorageConfiguration config; + private volatile TieredStorageConfiguration config; private final Location writeLocation; // metadata to be stored as part of the offloaded ledger metadata @@ -663,6 +663,16 @@ public OffloadPolicies getOffloadPolicies() { return OffloadPoliciesImpl.create(properties); } + @Override + public void updateOffloadPolicies(OffloadPolicies offloadPolicies) { + Properties properties = OffloadPoliciesImpl.toProperties((OffloadPoliciesImpl) offloadPolicies); + try { + this.config = TieredStorageConfiguration.create(properties); + } catch (IOException e) { + log.warn("Failed to update offload policies", e); + } + } + @Override public void close() { for (BlobStore readBlobStore : blobStores.values()) {