Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored and coderzc committed Dec 7, 2023
1 parent b433200 commit 010117f
Showing 1 changed file with 20 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,25 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.collections4.CollectionUtils;
import com.google.common.collect.Sets;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

import java.util.Random;

@Slf4j
public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
protected static final int ASYNC_EVENT_COMPLETION_WAIT = 100;

Expand Down Expand Up @@ -132,97 +118,30 @@ protected String newTopicName() {
*/
protected void deleteNamespaceGraceFully(String ns, boolean force)
throws Exception {
deleteNamespaceGraceFully(ns, force, pulsar, admin);
deleteNamespaceGraceFully(ns, force, pulsar , admin);
}

/**
* Wait until system topic "__change_event" and subscription "__compaction" are created, and then delete the namespace.
* 1. Pause system "__change_event" topic creates.
* 2. Do delete namespace with retry because maybe fail by race-condition with create topics.
*/
public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarService pulsar, PulsarAdmin admin)
throws Exception {
// namespace v1 should not wait system topic create.
if (ns.split("/").length > 2){
admin.namespaces().deleteNamespace(ns, force);
return;
}
if (!pulsar.getConfiguration().isSystemTopicEnabled()){
admin.namespaces().deleteNamespace(ns, force);
return;
}
// If no bundle has been loaded, then the System Topic will not trigger creation.
LockManager lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
List<String> lockedBundles = (List<String>) lockManager.listLocks("/namespace" + "/" + ns).join();
if (CollectionUtils.isEmpty(lockedBundles)){
admin.namespaces().deleteNamespace(ns, force);
return;
}
// Trigger change event topic create.
NamespaceName namespace = NamespaceName.get(ns);
NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
when(namespaceBundle.getNamespaceObject()).thenReturn(namespace);
pulsar.getTopicPoliciesService().addOwnedNamespaceBundleAsync(namespaceBundle);
// Wait for change event topic and compaction create finish.
String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
ArrayList<String> expectChangeEventTopics = new ArrayList<>();
if ("non-partitioned".equals(allowAutoTopicCreationType)){
String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
expectChangeEventTopics.add(t);
} else {
for (int i = 0; i < defaultNumPartitions; i++){
String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
expectChangeEventTopics.add(t);
}
}
Awaitility.await().until(() -> {
boolean finished = true;
for (String changeEventTopicName : expectChangeEventTopics){
boolean bundleExists = pulsar.getNamespaceService()
.checkTopicOwnership(TopicName.get(changeEventTopicName))
.exceptionally(ex -> false).join();
if (!bundleExists){
finished = false;
break;
}
CompletableFuture<Optional<Topic>> completableFuture =
pulsar.getBrokerService().getTopic(changeEventTopicName, false);
if (completableFuture == null){
finished = false;
break;
}
Optional<Topic> optionalTopic = completableFuture.get();
if (!optionalTopic.isPresent()){
finished = false;
break;
}
PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
if (!changeEventTopic.isCompactionEnabled()){
break;
}
if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
finished = false;
break;
}
}
return finished;
});
int retryTimes = 3;
while (true) {
Awaitility.await()
.pollDelay(500, TimeUnit.MILLISECONDS)
.until(() -> {
try {
// Maybe fail by race-condition with create topics, just retry.
admin.namespaces().deleteNamespace(ns, force);
break;
} catch (PulsarAdminException ex) {
// Do retry only if topic fenced.
if (ex.getStatusCode() == 500 && ex.getMessage().contains("TopicFencedException")){
if (--retryTimes > 0){
continue;
} else {
throw ex;
}
}
throw ex;
return true;
} catch (PulsarAdminException.NotFoundException ex) {
// namespace was already deleted, ignore exception
return true;
} catch (Exception e) {
log.warn("Failed to delete namespace {} (force={})", ns, force, e);
return false;
}
}
});
}

private static final Logger LOG = LoggerFactory.getLogger(BrokerTestBase.class);
Expand Down

0 comments on commit 010117f

Please sign in to comment.