From 19face6dc9292ff40d710909b4444b01b15a2378 Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Thu, 8 Jun 2023 13:58:08 +0800 Subject: [PATCH] [fix][broker] Fix redirect loop when using ExtensibleLoadManager and list in bundle admin API (#20528) PIP: https://github.com/apache/pulsar/issues/16691 ### Motivation When using `ExtensibleLoadManager` and list in bundle admin API, it will redirect forever because `isServiceUnitOwned` method is checking the `ownershipCache` as the ownership storage, however, when using `ExtensibleLoadManager`, it stored the ownership to table view. ### Modifications * Call `isServiceUnitOwnedAsync ` when using `isServiceUnitOwned `. * Add unit test to cover this case. --- .../broker/namespace/NamespaceService.java | 18 +-------- .../ExtensibleLoadManagerImplTest.java | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index cf969460c3345..3997ab521b889 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1108,19 +1108,7 @@ public Set getOwnedServiceUnits() { } public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception { - if (suName instanceof TopicName) { - return isTopicOwnedAsync((TopicName) suName).get(); - } - - if (suName instanceof NamespaceName) { - return isNamespaceOwned((NamespaceName) suName); - } - - if (suName instanceof NamespaceBundle) { - return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName); - } - - throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()); + return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) { @@ -1174,10 +1162,6 @@ public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) }); } - private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { - return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e8a4682e528d5..d1c8ddbb4d3e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -55,6 +55,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -926,6 +927,43 @@ public void testDisableBroker() throws Exception { } } + @Test(timeOut = 30 * 1000) + public void testListTopic() throws Exception { + final String namespace = "public/testListTopic"; + admin.namespaces().createNamespace(namespace, 3); + + final String persistentTopicName = TopicName.get( + "persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + + final String nonPersistentTopicName = TopicName.get( + "non-persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + admin.topics().createPartitionedTopic(persistentTopicName, 3); + admin.topics().createPartitionedTopic(nonPersistentTopicName, 3); + pulsarClient.newProducer().topic(persistentTopicName).create().close(); + pulsarClient.newProducer().topic(nonPersistentTopicName).create().close(); + + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + List boundaries = bundlesData.getBoundaries(); + int topicNum = 0; + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + List topic = admin.topics().getListInBundle(namespace, bundle); + if (topic == null) { + continue; + } + topicNum += topic.size(); + for (String s : topic) { + assertFalse(TopicName.get(s).isPersistent()); + } + } + assertEquals(topicNum, 3); + + List list = admin.topics().getList(namespace); + assertEquals(list.size(), 6); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override