Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][improve][ml] Make offloader threshold config dynamic #23775

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ default CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> of
*/
OffloadPolicies getOffloadPolicies();


/**
* Update the offload policies of this LedgerOffloader.
*
* @param offloadPolicies the new offload policies
*/
default void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
// default no-op
}

/**
* Close the resources if necessary.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
Expand All @@ -76,6 +78,8 @@
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -275,10 +279,48 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

internalUpdateOffloadPolicies(data, null);
updateEntryFilters();
}

private void internalUpdateOffloadPolicies(TopicPolicies policies, Policies nsPolicies) {
PulsarService pulsar = brokerService.getPulsar();
if (!(this instanceof PersistentTopic) || isSystemTopic()) {
return;
}
CompletableFuture<Optional<Topic>> t = brokerService.getTopics().get(topic);
if (!t.isDone() || t.isCompletedExceptionally()) {
return;
}
TopicName topicName = TopicName.get(topic);
Properties brokerCnf = pulsar.getConfig().getProperties();
CompletableFuture<Optional<TopicPolicies>> f = policies == null ? pulsar.getTopicPoliciesService()
.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.DEFAULT)
: CompletableFuture.completedFuture(Optional.of(policies)) ;
CompletableFuture<Optional<Policies>> f1 = nsPolicies == null ? pulsar.getPulsarResources()
.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
: CompletableFuture.completedFuture(Optional.of(nsPolicies));

CompletableFuture.allOf(f, f1).thenAccept(__ -> {
Optional<TopicPolicies> topicPolicies = f.join();
Optional<Policies> namespacePolicies = f1.join();
OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) namespacePolicies.map(p -> p.offload_policies).orElse(null);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
topicPolicies.map(TopicPolicies::getOffloadPolicies).orElse(null),
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, namespacePolicies.orElse(null)),
brokerCnf);
updateOffloadPolicies(offloadPolicies);
}).exceptionally(ex -> {
log.error("[{}] Failed to get offload policies", topic, ex);
return null;
});
}

protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
// No-op by default
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
Expand Down Expand Up @@ -334,7 +376,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);

internalUpdateOffloadPolicies(null, namespacePolicies);
updateEntryFilters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -167,6 +168,8 @@
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -4273,6 +4276,15 @@ public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
}
}

@Override
protected void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
LedgerOffloader offloader = ledger.getConfig().getLedgerOffloader();
if (null == offloader || null == offloadPolicies) {
return;
}
offloader.updateOffloadPolicies(offloadPolicies);
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,18 +346,18 @@ public boolean bucketValid() {
return false;
}

public Properties toProperties() {
public static Properties toProperties(OffloadPoliciesImpl offloadPolicies) {
Properties properties = new Properties();
for (Field f : CONFIGURATION_FIELDS) {
try {
f.setAccessible(true);
if ("managedLedgerExtraConfigurations".equals(f.getName())) {
Map<String, String> extraConfig = (Map<String, String>) f.get(this);
Map<String, String> extraConfig = (Map<String, String>) f.get(offloadPolicies);
extraConfig.forEach((key, value) -> {
setProperty(properties, EXTRA_CONFIG_PREFIX + key, value);
});
} else {
setProperty(properties, f.getName(), f.get(this));
setProperty(properties, f.getName(), f.get(offloadPolicies));
}
} catch (Exception e) {
throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
Expand All @@ -366,6 +366,10 @@ public Properties toProperties() {
return properties;
}

public Properties toProperties() {
return toProperties(this);
}

private static void setProperty(Properties properties, String key, Object value) {
if (value != null) {
properties.setProperty(key, "" + value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private OrderedScheduler scheduler;
private static final long ENTRIES_PER_READ = 100;
private OrderedScheduler assignmentScheduler;
private OffloadPolicies offloadPolicies;
private volatile OffloadPolicies offloadPolicies;
private final LedgerOffloaderStats offloaderStats;

public static boolean driverSupported(String driver) {
Expand Down Expand Up @@ -394,6 +394,13 @@ public OffloadPolicies getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (null != offloadPolicies) {
this.offloadPolicies = offloadPolicies;
}
}

@Override
public void close() {
if (fileSystem != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";

private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
private volatile TieredStorageConfiguration config;
private final Location writeLocation;

// metadata to be stored as part of the offloaded ledger metadata
Expand Down Expand Up @@ -663,6 +663,16 @@ public OffloadPolicies getOffloadPolicies() {
return OffloadPoliciesImpl.create(properties);
}

@Override
public void updateOffloadPolicies(OffloadPolicies offloadPolicies) {
Properties properties = OffloadPoliciesImpl.toProperties((OffloadPoliciesImpl) offloadPolicies);
try {
this.config = TieredStorageConfiguration.create(properties);
} catch (IOException e) {
log.warn("Failed to update offload policies", e);
}
}

@Override
public void close() {
for (BlobStore readBlobStore : blobStores.values()) {
Expand Down
Loading