Skip to content

Commit

Permalink
append partitioned topic wait
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Aug 12, 2022
1 parent 897a7bb commit 342d935
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminExcept

@Test
public void testDeleteTenant() throws Exception {
int partitionCount = 10;
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);

String tenant = "test-tenant-1";
Expand All @@ -1355,9 +1356,16 @@ public void testDeleteTenant() throws Exception {

// create topic
String topic = namespace + "/test-topic-1";
admin.topics().createPartitionedTopic(topic, 10);
admin.topics().createPartitionedTopic(topic, partitionCount);
assertFalse(admin.topics().getList(namespace).isEmpty());

// Wait for topics create finish.
Awaitility.await().until(() -> {
TreeSet<String> topicsCreated = queryPersistentTopicsByNamespace(namespace);
String topicNamePrefix = String.format("persistent://%s/%s", namespace, "test-topic-1");
return topicsCreated.stream().filter(s -> s.startsWith(topicNamePrefix)).count() == partitionCount + 1;
});

try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
Expand All @@ -1374,7 +1382,7 @@ public void testDeleteTenant() throws Exception {
if (!pulsar.getConfiguration().isSystemTopicEnabled()) {
return true;
}
TreeSet<String> topicsCreated = queryTopicsByNamespace(namespace);
TreeSet<String> topicsCreated = queryPersistentTopicsByNamespace(namespace);
return topicsCreated.contains(String.format("persistent://%s/%s", namespace, NAMESPACE_EVENTS_LOCAL_NAME));
});

Expand All @@ -1397,15 +1405,13 @@ public void testDeleteTenant() throws Exception {
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}

private TreeSet<String> queryTopicsByNamespace(String namespace){
private TreeSet<String> queryPersistentTopicsByNamespace(String namespace){
NamespaceName namespaceName = NamespaceName.get(namespace);
TreeSet<String> topics = new TreeSet<>();
topics.addAll(pulsar.getNamespaceService()
.getListOfPersistentTopics(NamespaceName.get(namespace)).join());
topics.addAll(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, TopicDomain.persistent).join());
topics.addAll(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, TopicDomain.non_persistent).join());
return topics;
}

Expand Down

0 comments on commit 342d935

Please sign in to comment.