Skip to content

Commit

Permalink
Fix list in bundle admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jun 7, 2023
1 parent f53cdda commit f97b82d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suNa
*/
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
return getWebServiceUrlAsync(suName, options)
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
}

private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(Optional<ServiceUnitId> topic,
Expand Down Expand Up @@ -1108,19 +1108,7 @@ public Set<NamespaceBundle> getOwnedServiceUnits() {
}

public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
if (suName instanceof TopicName) {
return isTopicOwnedAsync((TopicName) suName).get();
}

if (suName instanceof NamespaceName) {
return isNamespaceOwned((NamespaceName) suName);
}

if (suName instanceof NamespaceBundle) {
return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName);
}

throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName());
return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
}

public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName) {
Expand Down Expand Up @@ -1174,10 +1162,6 @@ public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName)
});
}

private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
}

private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -926,6 +927,43 @@ public void testDisableBroker() throws Exception {
}
}

@Test(timeOut = 30 * 1000)
public void testListTopic() throws Exception {
final String namespace = "public/testListTopic";
admin.namespaces().createNamespace(namespace, 3);

final String persistentTopicName = TopicName.get(
"persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();

final String nonPersistentTopicName = TopicName.get(
"non-persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
admin.topics().createPartitionedTopic(persistentTopicName, 3);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
pulsarClient.newProducer().topic(persistentTopicName).create().close();
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();

BundlesData bundlesData = admin.namespaces().getBundles(namespace);
List<String> boundaries = bundlesData.getBoundaries();
int topicNum = 0;
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
List<String> topic = admin.topics().getListInBundle(namespace, bundle);
if (topic == null) {
continue;
}
topicNum += topic.size();
for (String s : topic) {
assertFalse(TopicName.get(s).isPersistent());
}
}
assertEquals(topicNum, 3);

List<String> list = admin.topics().getList(namespace);
assertEquals(list.size(), 6);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down

0 comments on commit f97b82d

Please sign in to comment.