diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38c..e6f53ae159c046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1542,7 +1542,13 @@ public CompletableFuture checkReplication() { continue; } if (!replicators.containsKey(cluster)) { - futures.add(startReplicator(cluster)); + futures.add(checkReplicationCluster(cluster).thenCompose(clusterExists -> { + if (clusterExists) { + return startReplicator(cluster); + } else { + return CompletableFuture.completedFuture(null); + } + })); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c63be7aad01cd0..e3ffb787678bfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -38,6 +38,7 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -83,6 +85,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class PersistentTopicTest extends BrokerTestBase { @@ -584,8 +587,14 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> { + log.info("Before initialize..."); + try { + topic.initialize().get(3, TimeUnit.SECONDS); + } catch (ExecutionException e) { + log.warn("Failed to initialize: {}", e.getCause().getMessage()); + } + return !topic.getManagedLedger().getCursors().iterator().hasNext(); + }); } }