From f97b82df73141b904be36a340fb0a62af53cd770 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 7 Jun 2023 19:52:09 +0800 Subject: [PATCH] Fix list in bundle admin API --- .../broker/namespace/NamespaceService.java | 20 +--------- .../ExtensibleLoadManagerImplTest.java | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index cf969460c3345..d10ebba96bbe0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -282,7 +282,7 @@ public CompletableFuture> getWebServiceUrlAsync(ServiceUnitId suNa */ public Optional getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception { return getWebServiceUrlAsync(suName, options) - .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } private CompletableFuture> internalGetWebServiceUrl(Optional topic, @@ -1108,19 +1108,7 @@ public Set getOwnedServiceUnits() { } public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception { - if (suName instanceof TopicName) { - return isTopicOwnedAsync((TopicName) suName).get(); - } - - if (suName instanceof NamespaceName) { - return isNamespaceOwned((NamespaceName) suName); - } - - if (suName instanceof NamespaceBundle) { - return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName); - } - - throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()); + return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) { @@ -1174,10 +1162,6 @@ public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) }); } - private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { - return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e8a4682e528d5..d1c8ddbb4d3e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -55,6 +55,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -926,6 +927,43 @@ public void testDisableBroker() throws Exception { } } + @Test(timeOut = 30 * 1000) + public void testListTopic() throws Exception { + final String namespace = "public/testListTopic"; + admin.namespaces().createNamespace(namespace, 3); + + final String persistentTopicName = TopicName.get( + "persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + + final String nonPersistentTopicName = TopicName.get( + "non-persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + admin.topics().createPartitionedTopic(persistentTopicName, 3); + admin.topics().createPartitionedTopic(nonPersistentTopicName, 3); + pulsarClient.newProducer().topic(persistentTopicName).create().close(); + pulsarClient.newProducer().topic(nonPersistentTopicName).create().close(); + + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + List boundaries = bundlesData.getBoundaries(); + int topicNum = 0; + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + List topic = admin.topics().getListInBundle(namespace, bundle); + if (topic == null) { + continue; + } + topicNum += topic.size(); + for (String s : topic) { + assertFalse(TopicName.get(s).isPersistent()); + } + } + assertEquals(topicNum, 3); + + List list = admin.topics().getList(namespace); + assertEquals(list.size(), 6); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override