diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fa08330ff3c353..587609fa84404c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -338,7 +338,38 @@ public CompletableFuture initialize() { if (cursor.getName().startsWith(replicatorPrefix)) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); + final CompletableFuture future = new CompletableFuture<>(); + addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, e) -> { + if (e == null) { + future.complete(null); + } else { + Throwable throwable = e; + while (throwable.getCause() != null) { + throwable = throwable.getCause(); + } + if (throwable instanceof MetadataStoreException.NotFoundException + && throwable.getMessage().equals(remoteCluster)) { + log.warn("[{}] Remote cluster '{}' is not found while there is a replicator cursor," + + " remove cursor '{}'", topic, remoteCluster, cursor.getName()); + ledger.asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() { + @Override + public void deleteCursorComplete(Object ctx) { + log.info("[{}] Deleted replicator cursor '{}'", topic, cursor.getName()); + } + + @Override + public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Failed to delete the replicator cursor '{}'", + topic, cursor.getName(), exception); + } + }, null); + future.complete(null); + } else { + future.completeExceptionally(e); + } + } + }); + futures.add(future); } } return FutureUtil.waitForAll(futures).thenCompose(__ -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 80a79e0234de43..bea38336293a14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -40,15 +40,20 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -57,6 +62,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -525,4 +531,28 @@ public void testDeleteTopicFail() throws Exception { makeDeletedFailed.set(false); persistentTopic.delete().get(); } + + @Test + public void testCreateTopicWithZombieReplicatorCursor() throws Exception { + final String topicName = "persistent://prop/ns-abc/testCreateTopicWithZombieReplicatorCursor"; + final String remoteCluster = "remote"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster, + MessageId.earliest, true); + + final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false) + .get(3, TimeUnit.SECONDS).orElse(null); + assertNotNull(topic); + + final Supplier> getCursors = () -> { + final Set cursors = new HashSet<>(); + final Iterable iterable = topic.getManagedLedger().getCursors(); + iterable.forEach(c -> cursors.add(c.getName())); + return cursors; + }; + assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster)); + topic.initialize().get(3, TimeUnit.SECONDS); + Awaitility.await().atMost(3, TimeUnit.SECONDS) + .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + } }