Skip to content

Commit

Permalink
Make offloader threshold dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Dec 24, 2024
1 parent 51e8247 commit 45af06d
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ default CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> of
*/
OffloadPolicies getOffloadPolicies();


/**
* Update the offload policies of this LedgerOffloader.
*
* @param offloadPolicies the new offload policies
*/
default void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
// default no-op
}

/**
* Close the resources if necessary.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
Expand All @@ -76,6 +78,8 @@
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -275,10 +279,48 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

internalUpdateOffloadPolicies(data, null);
updateEntryFilters();
}

private void internalUpdateOffloadPolicies(TopicPolicies policies, Policies nsPolicies) {
PulsarService pulsar = brokerService.getPulsar();
if (!(this instanceof PersistentTopic) || isSystemTopic()) {
return;
}
CompletableFuture<Optional<Topic>> t = brokerService.getTopics().get(topic);
if (!t.isDone() || t.isCompletedExceptionally()) {
return;
}
TopicName topicName = TopicName.get(topic);
Properties brokerCnf = pulsar.getConfig().getProperties();
CompletableFuture<Optional<TopicPolicies>> f = policies == null ? pulsar.getTopicPoliciesService()
.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.DEFAULT)
: CompletableFuture.completedFuture(Optional.of(policies)) ;
CompletableFuture<Optional<Policies>> f1 = nsPolicies == null ? pulsar.getPulsarResources()
.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
: CompletableFuture.completedFuture(Optional.of(nsPolicies));

CompletableFuture.allOf(f, f1).thenAccept(__ -> {
Optional<TopicPolicies> topicPolicies = f.join();
Optional<Policies> namespacePolicies = f1.join();
OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) namespacePolicies.map(p -> p.offload_policies).orElse(null);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
topicPolicies.map(TopicPolicies::getOffloadPolicies).orElse(null),
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, namespacePolicies.orElse(null)),
brokerCnf);
updateOffloadPolicies(offloadPolicies);
}).exceptionally(ex -> {
log.error("[{}] Failed to get offload policies", topic, ex);
return null;
});
}

protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
// No-op by default
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
Expand Down Expand Up @@ -334,7 +376,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);

internalUpdateOffloadPolicies(null, namespacePolicies);
updateEntryFilters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -167,6 +168,8 @@
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -4273,6 +4276,15 @@ public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
}
}

@Override
protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
LedgerOffloader offloader = ledger.getConfig().getLedgerOffloader();
if (null == offloader || null == offloadPolicies) {
return;
}
offloader.updateOffloadPolicies(offloadPolicies);
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,18 +346,18 @@ public boolean bucketValid() {
return false;
}

public Properties toProperties() {
public static Properties toProperties(OffloadPoliciesImpl offloadPolicies) {
Properties properties = new Properties();
for (Field f : CONFIGURATION_FIELDS) {
try {
f.setAccessible(true);
if ("managedLedgerExtraConfigurations".equals(f.getName())) {
Map<String, String> extraConfig = (Map<String, String>) f.get(this);
Map<String, String> extraConfig = (Map<String, String>) f.get(offloadPolicies);
extraConfig.forEach((key, value) -> {
setProperty(properties, EXTRA_CONFIG_PREFIX + key, value);
});
} else {
setProperty(properties, f.getName(), f.get(this));
setProperty(properties, f.getName(), f.get(offloadPolicies));
}
} catch (Exception e) {
throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
Expand All @@ -366,6 +366,10 @@ public Properties toProperties() {
return properties;
}

public Properties toProperties() {
return toProperties(this);
}

private static void setProperty(Properties properties, String key, Object value) {
if (value != null) {
properties.setProperty(key, "" + value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private OrderedScheduler scheduler;
private static final long ENTRIES_PER_READ = 100;
private OrderedScheduler assignmentScheduler;
private OffloadPolicies offloadPolicies;
private volatile OffloadPolicies offloadPolicies;
private final LedgerOffloaderStats offloaderStats;

public static boolean driverSupported(String driver) {
Expand Down Expand Up @@ -394,6 +394,13 @@ public OffloadPolicies getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (null != offloadPolicies) {
this.offloadPolicies = offloadPolicies;
}
}

@Override
public void close() {
if (fileSystem != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";

private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
private volatile TieredStorageConfiguration config;
private final Location writeLocation;

// metadata to be stored as part of the offloaded ledger metadata
Expand Down Expand Up @@ -663,6 +663,16 @@ public OffloadPolicies getOffloadPolicies() {
return OffloadPoliciesImpl.create(properties);
}

@Override
public void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
Properties properties = OffloadPoliciesImpl.toProperties((OffloadPoliciesImpl) offloadPolicies);
try {
this.config = TieredStorageConfiguration.create(properties);
} catch (IOException e) {
log.warn("Failed to update offload policies", e);
}
}

@Override
public void close() {
for (BlobStore readBlobStore : blobStores.values()) {
Expand Down

0 comments on commit 45af06d

Please sign in to comment.