Skip to content

Commit

Permalink
[improve] [broker] Improve CPU resources usege of TopicName Cache (ap…
Browse files Browse the repository at this point in the history
…ache#23052)

Co-authored-by: Zixuan Liu <[email protected]>
  • Loading branch information
poorbarcode and nodece authored Jul 22, 2024
1 parent 6fa3bcf commit 81aed6c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 17 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,16 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTopicNameCacheMaxCapacity(5000);
conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down Expand Up @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}

@Test
public void testTopicCacheConfiguration() throws Exception {
cleanup();
setup();
assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);

List<TopicName> topicNameCached = new ArrayList<>();
for (int i = 0; i < 20; i++) {
topicNameCached.add(TopicName.get("public/default/tp_" + i));
}

// Verify: the cache does not clear since it is not larger than max capacity.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}

// Update max capacity.
admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10");

// Verify: the cache were cleared.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}
}

@Test
public void testBacklogAndRetentionCheck() throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,7 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void testInit() throws Exception {
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down Expand Up @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception {
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED);
}

@Test
public void testTopicNameCacheConfiguration() throws Exception {
ServiceConfiguration conf;
final Properties properties = new Properties();
properties.setProperty("maxSecondsToClearTopicNameCache", "2");
properties.setProperty("topicNameCacheMaxCapacity", "100");
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
package org.apache.pulsar.common.naming;

import com.google.common.base.Splitter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;

Expand All @@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {

private final int partitionIndex;

private static final LoadingCache<String, TopicName> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, TopicName>() {
@Override
public TopicName load(String name) throws Exception {
return new TopicName(name);
}
});
private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();

public static void clearIfReachedMaxCapacity(int maxCapacity) {
if (maxCapacity < 0) {
// Unlimited cache.
return;
}
if (cache.size() > maxCapacity) {
cache.clear();
}
}

public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
Expand All @@ -79,11 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String
}

public static TopicName get(String topic) {
try {
return cache.get(topic);
} catch (ExecutionException | UncheckedExecutionException e) {
throw (RuntimeException) e.getCause();
TopicName tp = cache.get(topic);
if (tp != null) {
return tp;
}
return cache.computeIfAbsent(topic, k -> new TopicName(k));
}

public static TopicName getPartitionedTopicName(String topic) {
Expand Down

0 comments on commit 81aed6c

Please sign in to comment.