Skip to content

Commit

Permalink
[fix][broker] pre-create non-partitioned system topics for load balan…
Browse files Browse the repository at this point in the history
…ce extension (#20370)

PIP: #16691

### Motivation

We need to create system topics without partitions explicitly. Currently, we do not support partitioned system topics.

### Modifications

 create system topics without partitions explicitly

(cherry picked from commit 1080ad5)
  • Loading branch information
heesung-sn authored and poorbarcode committed May 30, 2023
1 parent 5d34dad commit 9733f7e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -212,6 +213,19 @@ public static boolean debug(ServiceConfiguration config, Logger log) {
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}

public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException {
try {
pulsar.getAdminClient().topics().createNonPartitionedTopic(topic);
log.info("Created topic {}.", topic);
} catch (PulsarAdminException.ConflictException ex) {
if (debug(pulsar.getConfiguration(), log)) {
log.info("Topic {} already exists.", topic, ex);
}
} catch (PulsarAdminException e) {
throw new PulsarServerException(e);
}
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
Expand Down Expand Up @@ -246,6 +260,9 @@ public void start() throws PulsarServerException {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ public synchronized void start() throws PulsarServerException {
PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
.compressionType(MSG_COMPRESSION_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
Expand Down Expand Up @@ -129,6 +130,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
Expand Down Expand Up @@ -129,6 +130,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
super.internalSetup(conf);
Expand Down

0 comments on commit 9733f7e

Please sign in to comment.