diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a6345f4a56a716..aa1d4f31667a97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1268,10 +1268,14 @@ public PulsarClient getReplicationClient(String cluster, Optional c return client; } + ClusterData data = clusterDataOp.orElse(null); + if (data == null) { + log.warn("Skip creating the replication client because the data of cluster '{}' is empty", cluster); + return null; + } + return replicationClients.computeIfAbsent(cluster, key -> { try { - ClusterData data = clusterDataOp - .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster)); ClientBuilder clientBuilder = PulsarClient.builder() .enableTcpNoDelay(false) .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a0a8462a22753b..012b21a55bfd72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -604,6 +604,9 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, No .thenApply(clusterData -> brokerService.getReplicationClient(remoteCluster, clusterData))) .thenAccept(replicationClient -> { + if (replicationClient == null) { + return; + } replicators.computeIfAbsent(remoteCluster, r -> { try { return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38c..df51a84eca675d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1542,7 +1542,13 @@ public CompletableFuture checkReplication() { continue; } if (!replicators.containsKey(cluster)) { - futures.add(startReplicator(cluster)); + futures.add(checkReplicationCluster(cluster).thenCompose(clusterExists -> { + if (clusterExists) { + return startReplicator(cluster); + } else { + return CompletableFuture.completedFuture(null); + } + })); } } @@ -1715,8 +1721,7 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma } return brokerService.pulsar().getPulsarResources().getClusterResources() .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData)); + .thenApply(clusterData -> brokerService.getReplicationClient(remoteCluster, clusterData)); }) .thenAccept(replicationClient -> { if (replicationClient == null) { @@ -1796,6 +1801,9 @@ protected CompletableFuture addShadowReplicationCluster(String shadowTopic .getClusterAsync(localCluster) .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData))) .thenAccept(replicationClient -> { + if (replicationClient == null) { + return; + } Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> { try { return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c63be7aad01cd0..e3ffb787678bfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -38,6 +38,7 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -83,6 +85,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class PersistentTopicTest extends BrokerTestBase { @@ -584,8 +587,14 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> { + log.info("Before initialize..."); + try { + topic.initialize().get(3, TimeUnit.SECONDS); + } catch (ExecutionException e) { + log.warn("Failed to initialize: {}", e.getCause().getMessage()); + } + return !topic.getManagedLedger().getCursors().iterator().hasNext(); + }); } }