Skip to content

Commit

Permalink
[fix][broker] Check replication cluster before starting the replicator
Browse files Browse the repository at this point in the history
Fixes apache#20010

### Motivation

`PersistentTopicTest.testCreateTopicWithZombieReplicatorCursor` is flaky
because the cursor could still be created again in `startReplicator`,
which could be called by:

```
onPoliciesUpdate
  checkReplicationAndRetryOnFailure
    checkReplication
```

### Modifications

Call `checkReplicationCluster` before calling `startReplicator`.

Since there is still a rare chance that the cluster data is empty when
the cluster still exists, return null instead of throwing a runtime
exception, then skip creating the replication client.

Use `Awaitility` to check if the cursor has been deleted eventually.
  • Loading branch information
BewareMyPower committed Apr 6, 2023
1 parent 42a6969 commit a5ee58f
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,14 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
return client;
}

ClusterData data = clusterDataOp.orElse(null);
if (data == null) {
log.warn("Skip creating the replication client because the data of cluster '{}' is empty", cluster);
return null;
}

return replicationClients.computeIfAbsent(cluster, key -> {
try {
ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,9 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, No
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
return;
}
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,13 @@ public CompletableFuture<Void> checkReplication() {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
futures.add(checkReplicationCluster(cluster).thenCompose(clusterExists -> {
if (clusterExists) {
return startReplicator(cluster);
} else {
return CompletableFuture.completedFuture(null);
}
}));
}
}

Expand Down Expand Up @@ -1715,8 +1721,7 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
.thenApply(clusterData -> brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenAccept(replicationClient -> {
if (replicationClient == null) {
Expand Down Expand Up @@ -1796,6 +1801,9 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
.getClusterAsync(localCluster)
.thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
return;
}
Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -52,6 +53,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -83,6 +85,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class PersistentTopicTest extends BrokerTestBase {

Expand Down Expand Up @@ -584,8 +587,14 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
admin.clusters().deleteCluster(remoteCluster);
// Now the cluster and its related policy has been removed but the replicator cursor still exists

topic.initialize().get(3, TimeUnit.SECONDS);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
log.info("Before initialize...");
try {
topic.initialize().get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
log.warn("Failed to initialize: {}", e.getCause().getMessage());
}
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});
}
}

0 comments on commit a5ee58f

Please sign in to comment.