Skip to content

Commit

Permalink
[fix][test]fix flaky test deleteNamespaceGracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Nov 10, 2022
1 parent 79a97a9 commit d0d9f3f
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void resetClusters() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
for (String tenant : admin.tenants().getTenants()) {
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
deleteNamespaceGraceFullyByMultiPulsars(namespace, true, admin, pulsar,
deleteNamespaceGraceFully(namespace, true, admin, pulsar,
mockPulsarSetup.getPulsar());
}
admin.tenants().deleteTenant(tenant, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void reset() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
for (String tenant : admin.tenants().getTenants()) {
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
deleteNamespaceGraceFullyByMultiPulsars(namespace, true, admin, pulsar,
deleteNamespaceGraceFully(namespace, true, admin, pulsar,
mockPulsarSetup.getPulsar());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -65,6 +67,9 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.CanPausedNamespaceService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.systopic.ClientMarkerNamespaceEventsSystemTopicFactory;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -356,6 +361,7 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con
setupBrokerMocks(pulsar);
beforePulsarStartMocks(pulsar);
pulsar.start();
insteadOfClientMarkerSystemTopicService(pulsar);
log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
pulsar.getWebServiceAddress());
return pulsar;
Expand Down Expand Up @@ -685,11 +691,91 @@ protected void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin a
}

/**
* see see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
* see {@link MockedPulsarServiceBaseTest#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
*/
protected void deleteNamespaceGraceFullyByMultiPulsars(String ns, boolean force, PulsarAdmin admin,
PulsarService...pulsars) throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, admin, pulsars);
public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin admin, PulsarService...pulsars)
throws Exception {
deleteNamespaceGraceFully(ns, force, admin, Arrays.asList(pulsars));
}

public static void insteadOfClientMarkerSystemTopicService(PulsarService pulsar) throws Exception {
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
if (!(topicPoliciesService instanceof SystemTopicBasedTopicPoliciesService)) {
return;
}
SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService =
(SystemTopicBasedTopicPoliciesService)topicPoliciesService;
Field field_namespaceEventsSystemTopicFactory =
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("namespaceEventsSystemTopicFactory");
field_namespaceEventsSystemTopicFactory.setAccessible(true);
field_namespaceEventsSystemTopicFactory.set(systemTopicBasedTopicPoliciesService,
new ClientMarkerNamespaceEventsSystemTopicFactory(pulsar.getClient()));
}

/**
* Wait until system topic "__change_event" and subscription "__compaction" are created, and then delete the namespace.
*/
public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin admin,
Collection<PulsarService> pulsars) throws Exception {
// namespace v1 should not wait system topic create.
if (ns.split("/").length > 2){
admin.namespaces().deleteNamespace(ns, force);
return;
}

// If disabled system-topic, should not wait system topic create.
boolean allBrokerDisabledSystemTopic = true;
for (PulsarService pulsar : pulsars) {
if (!pulsar.getConfiguration().isSystemTopicEnabled()) {
continue;
}
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
if (!(topicPoliciesService instanceof SystemTopicBasedTopicPoliciesService)) {
continue;
}
allBrokerDisabledSystemTopic = false;
}
if (allBrokerDisabledSystemTopic){
admin.namespaces().deleteNamespace(ns, force);
return;
}

// Stop trigger "onNamespaceBundleOwned".
List<CompletableFuture<Void>> runningEventListeners = new ArrayList<>();
for (PulsarService pulsar : pulsars) {
// Prevents new events from triggering system topic creation.
CanPausedNamespaceService canPausedNamespaceService = (CanPausedNamespaceService) pulsar.getNamespaceService();
runningEventListeners.addAll(canPausedNamespaceService.pause());
}
// Wait all client-create tasks.
ClientMarkerNamespaceEventsSystemTopicFactory.waitAllClientCreateTaskDone(pulsars, runningEventListeners);

// Do delete.
int retryTimes = 3;
while (true) {
try {
admin.namespaces().deleteNamespace(ns, force);
break;
} catch (PulsarAdminException ex) {
// Do retry only if topic fenced.
if (ex.getStatusCode() == 500 && ex.getMessage().contains("TopicFencedException")){
if (--retryTimes > 0){
continue;
} else {
throw ex;
}
}
throw ex;
}
}

// Resume trigger "onNamespaceBundleOwned".
for (PulsarService pulsarService : pulsars) {
// Prevents new events from triggering system topic creation.
CanPausedNamespaceService canPausedNamespaceService =
(CanPausedNamespaceService) pulsarService.getNamespaceService();
canPausedNamespaceService.resume();
}
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.insteadOfClientMarkerSystemTopicService;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand All @@ -40,6 +41,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -83,27 +85,19 @@ public class BacklogQuotaManagerTest {
private static final int MAX_ENTRIES_PER_LEDGER = 5;

/**
* see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
* see {@link MockedPulsarServiceBaseTest#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
*/
protected void deleteNamespaceGraceFully(String ns, boolean force)
throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, admin, pulsar);
MockedPulsarServiceBaseTest.deleteNamespaceGraceFully(ns, force, admin, pulsar);
}

/**
* see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
* see {@link MockedPulsarServiceBaseTest#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
*/
protected void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin admin)
throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, admin, pulsar);
}

/**
* see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarAdmin, Collection)}
*/
protected void deleteNamespaceGraceFullyByMultiPulsars(String ns, boolean force, PulsarAdmin admin,
PulsarService...pulsars) throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, admin, pulsars);
MockedPulsarServiceBaseTest.deleteNamespaceGraceFully(ns, force, admin, pulsar);
}

@DataProvider(name = "backlogQuotaSizeGB")
Expand Down Expand Up @@ -139,6 +133,7 @@ void setup() throws Exception {

pulsar = new PulsarService(config);
pulsar.start();
insteadOfClientMarkerSystemTopicService(pulsar);

adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get());
admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build();
Expand Down
Loading

0 comments on commit d0d9f3f

Please sign in to comment.