diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e2d4ef5153769..c5d516cfc944c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1102,19 +1102,7 @@ public Set getOwnedServiceUnits() { } public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception { - if (suName instanceof TopicName) { - return isTopicOwnedAsync((TopicName) suName).get(); - } - - if (suName instanceof NamespaceName) { - return isNamespaceOwned((NamespaceName) suName); - } - - if (suName instanceof NamespaceBundle) { - return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName); - } - - throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()); + return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) { @@ -1168,10 +1156,6 @@ public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) }); } - private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { - return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 498f48d16d2cd..d6e208d3202c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -48,10 +48,11 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Sets; import java.util.LinkedHashMap; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -63,7 +64,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -927,6 +927,43 @@ public void testDisableBroker() throws Exception { } } + @Test(timeOut = 30 * 1000) + public void testListTopic() throws Exception { + final String namespace = "public/testListTopic"; + admin.namespaces().createNamespace(namespace, 3); + + final String persistentTopicName = TopicName.get( + "persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + + final String nonPersistentTopicName = TopicName.get( + "non-persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + admin.topics().createPartitionedTopic(persistentTopicName, 3); + admin.topics().createPartitionedTopic(nonPersistentTopicName, 3); + pulsarClient.newProducer().topic(persistentTopicName).create().close(); + pulsarClient.newProducer().topic(nonPersistentTopicName).create().close(); + + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + List boundaries = bundlesData.getBoundaries(); + int topicNum = 0; + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + List topic = admin.topics().getListInBundle(namespace, bundle); + if (topic == null) { + continue; + } + topicNum += topic.size(); + for (String s : topic) { + assertFalse(TopicName.get(s).isPersistent()); + } + } + assertEquals(topicNum, 3); + + List list = admin.topics().getList(namespace); + assertEquals(list.size(), 6); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override