Skip to content

Commit

Permalink
[fix][broker] Skip creating the replicator when the remote cluster is…
Browse files Browse the repository at this point in the history
… absent

### Motivation

Sometimes when a remote cluster is deleted, the replication cursor might
still exist for some topics. In this case, creating producers or
consumers on these topics will fail.

Here is a log observed in a production environment:

> WARN  org.apache.pulsar.broker.service.BrokerService - Replication or
> dedup check failed. Removing topic from topics list
> persistent://public/__kafka/__consumer_offsets-partition-40,
> java.util.concurrent.CompletionException: java.lang.RuntimeException:
> org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException:
> kop

If it happened, unloading the topic or restarting the broker could not
help. We have to remove the cursor manually.

### Modificatons

When initializing a `PersistentTopic`, if there is any replicator cursor
while the responding cluster does not exist, ignore the exception from
`addReplicationCluster`. Then, remove this "zombie" cursor.

### Verifications

`PersistentTopicTest#testCreateTopicWithZombieReplicatorCursor` is added
to verify `PersistentTopic#initialize` will succeed and the zombie
replicator cursor will be removed.
  • Loading branch information
BewareMyPower committed Mar 30, 2023
1 parent 68c10ee commit c57d1ac
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,38 @@ public CompletableFuture<Void> initialize() {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
final CompletableFuture<Void> future = new CompletableFuture<>();
addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, e) -> {
if (e == null) {
future.complete(null);
} else {
Throwable throwable = e;
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
if (throwable instanceof MetadataStoreException.NotFoundException
&& throwable.getMessage().equals(remoteCluster)) {
log.warn("[{}] Remote cluster '{}' is not found while there is a replicator cursor,"
+ " remove cursor '{}'", topic, remoteCluster, cursor.getName());
ledger.asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
log.info("[{}] Deleted replicator cursor '{}'", topic, cursor.getName());
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete the replicator cursor '{}'",
topic, cursor.getName(), exception);
}
}, null);
future.complete(null);
} else {
future.completeExceptionally(e);
}
}
});
futures.add(future);
}
}
return FutureUtil.waitForAll(futures).thenCompose(__ ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,20 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
Expand All @@ -57,6 +62,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -525,4 +531,28 @@ public void testDeleteTopicFail() throws Exception {
makeDeletedFailed.set(false);
persistentTopic.delete().get();
}

@Test
public void testCreateTopicWithZombieReplicatorCursor() throws Exception {
final String topicName = "persistent://prop/ns-abc/testCreateTopicWithZombieReplicatorCursor";
final String remoteCluster = "remote";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
MessageId.earliest, true);

final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
.get(3, TimeUnit.SECONDS).orElse(null);
assertNotNull(topic);

final Supplier<Set<String>> getCursors = () -> {
final Set<String> cursors = new HashSet<>();
final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
iterable.forEach(c -> cursors.add(c.getName()));
return cursors;
};
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
topic.initialize().get(3, TimeUnit.SECONDS);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
}
}

0 comments on commit c57d1ac

Please sign in to comment.