Skip to content

Commit

Permalink
[fix][broker] Do not write replicated snapshot marker when the topic …
Browse files Browse the repository at this point in the history
…which is not enable replication (#21495)

### Motivation
[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

### Modifications
According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.
  • Loading branch information
liangyepianzhou authored Nov 14, 2023
1 parent 36d4708 commit 2322004
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,7 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
}

updateTopicPolicyByNamespacePolicy(data);

checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;

isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
Expand Down Expand Up @@ -3497,12 +3497,14 @@ private synchronized void checkReplicatedSubscriptionControllerState(boolean sho
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1;

if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions
|| !replicationEnabled) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
Expand Down Expand Up @@ -3685,6 +3687,7 @@ public void onUpdate(TopicPolicies policies) {
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -50,7 +51,9 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand All @@ -60,6 +63,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -728,6 +732,213 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName)));
}

@DataProvider(name = "isTopicPolicyEnabled")
private Object[][] isTopicPolicyEnabled() {
// Todo: fix replication can not be enabled at topic level.
return new Object[][] { { Boolean.FALSE } };
}

/**
* Test the replication subscription can work normal in the following cases:
* <p>
* 1. Do not write data into the original topic when the topic does not configure a remote cluster. {topic1}
* 1. Publish message to the topic and then wait a moment,
* the backlog will not increase after publishing completely.
* 2. Acknowledge the messages, the last confirm entry does not change.
* 2. Snapshot and mark will be written after topic configure a remote cluster. {topic2}
* 1. publish message to topic. After publishing completely, the backlog of the topic keep increase.
* 2. Wait the snapshot complete, the backlog stop changing.
* 3. Publish messages to wait another snapshot complete.
* 4. Ack messages to move the mark delete position after the position record in the first snapshot.
* 5. Check new entry (a mark) appending to the original topic.
* 3. Stopping writing snapshot and mark after remove the remote cluster of the topic. {topic2}
* similar to step 1.
* </p>
*/
@Test(dataProvider = "isTopicPolicyEnabled")
public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEnabled) throws Exception {
// 1. Prepare resource and use proper configuration.
String namespace = BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
String topic1 = "persistent://" + namespace + "/replication-enable";
String topic2 = "persistent://" + namespace + "/replication-disable";
String subName = "sub";

admin1.namespaces().createNamespace(namespace);
pulsar1.getConfiguration().setTopicLevelPoliciesEnabled(isTopicPolicyEnabled);
pulsar1.getConfiguration().setReplicationPolicyCheckDurationSeconds(1);
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
// 2. Build Producer and Consumer.
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topic1)
.subscriptionName(subName)
.ackTimeout(5, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.replicateSubscriptionState(true)
.subscribe();
@Cleanup
Producer<byte[]> producer1 = client1.newProducer()
.topic(topic1)
.create();
// 3. Test replication subscription work as expected.
// Test case 1: disable replication, backlog will not increase.
testReplicatedSubscriptionWhenDisableReplication(producer1, consumer1, topic1);

// Test case 2: enable replication, mark and snapshot work as expected.
if (isTopicPolicyEnabled) {
admin1.topics().createNonPartitionedTopic(topic2);
admin1.topics().setReplicationClusters(topic2, List.of("r1", "r2"));
} else {
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
}
@Cleanup
Consumer<byte[]> consumer2 = client1.newConsumer()
.topic(topic2)
.subscriptionName(subName)
.ackTimeout(5, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.replicateSubscriptionState(true)
.subscribe();
@Cleanup
Producer<byte[]> producer2 = client1.newProducer()
.topic(topic2)
.create();
testReplicatedSubscriptionWhenEnableReplication(producer2, consumer2, topic2);

// Test case 3: enable replication, mark and snapshot work as expected.
if (isTopicPolicyEnabled) {
admin1.topics().setReplicationClusters(topic2, List.of("r1"));
} else {
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
}
testReplicatedSubscriptionWhenDisableReplication(producer2, consumer2, topic2);
// 4. Clear resource.
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(true);
admin1.namespaces().deleteNamespace(namespace, true);
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

/**
* Disable replication subscription.
* Test scheduled task case.
* 1. Send three messages |1:0|1:1|1:2|.
* 2. Get topic backlog, as backlog1.
* 3. Wait a moment.
* 4. Get the topic backlog again, the backlog will not increase.
* Test acknowledge messages case.
* 1. Get the last confirm entry, as LAC1.
* 2. Acknowledge these messages |1:0|1:1|.
* 3. wait a moment.
* 4. Get the last confirm entry, as LAC2. LAC1 is equal to LAC2.
* Clear environment.
* 1. Ack all the retained messages. |1:2|
* 2. Wait for the backlog to return to zero.
*/
private void testReplicatedSubscriptionWhenDisableReplication(Producer<byte[]> producer, Consumer<byte[]> consumer,
String topic) throws Exception {
final int messageSum = 3;
// Test scheduled task case.
for (int i = 0; i < messageSum; i++) {
producer.newMessage().send();
}
long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize();
Thread.sleep(3000);
long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog1, backlog2);
// Test acknowledge messages case.
String lastConfirmEntry1 = admin1.topics().getInternalStats(topic).lastConfirmedEntry;
for (int i = 0; i < messageSum - 1; i++) {
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
}
Awaitility.await().untilAsserted(() -> {
String lastConfirmEntry2 = admin1.topics().getInternalStats(topic).lastConfirmedEntry;
assertEquals(lastConfirmEntry1, lastConfirmEntry2);
});
// Clear environment.
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
Awaitility.await().untilAsserted(() -> {
long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog4, 0);
});
}

/**
* Enable replication subscription.
* Test scheduled task case.
* 1. Wait replicator connected.
* 2. Send three messages |1:0|1:1|1:2|.
* 3. Get topic backlog, as backlog1.
* 4. Wait a moment.
* 5. Get the topic backlog again, as backlog2. The backlog2 is bigger than backlog1. |1:0|1:1|1:2|mark|.
* 6. Wait the snapshot complete.
* Test acknowledge messages case.
* 1. Write messages and wait another snapshot complete. |1:0|1:1|1:2|mark|1:3|1:4|1:5|mark|
* 2. Ack message |1:0|1:1|1:2|1:3|1:4|.
* 3. Get last confirm entry, as LAC1.
* 2. Wait a moment.
* 3. Get Last confirm entry, as LAC2. LAC2 different to LAC1. |1:5|mark|mark|
* Clear environment.
* 1. Ack all the retained message |1:5|.
* 2. Wait for the backlog to return to zero.
*/
private void testReplicatedSubscriptionWhenEnableReplication(Producer<byte[]> producer, Consumer<byte[]> consumer,
String topic) throws Exception {
final int messageSum = 3;
Awaitility.await().untilAsserted(() -> {
List<String> keys = pulsar1.getBrokerService()
.getTopic(topic, false).get().get()
.getReplicators().keys();
assertEquals(keys.size(), 1);
assertTrue(pulsar1.getBrokerService()
.getTopic(topic, false).get().get()
.getReplicators().get(keys.get(0)).isConnected());
});
// Test scheduled task case.
sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
// Test acknowledge messages case.
// After snapshot write completely, acknowledging message to move the mark delete position
// after the position recorded in the snapshot will trigger to write a new marker.
sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
String lastConfirmedEntry3 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
for (int i = 0; i < messageSum * 2 - 1; i++) {
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
}
Awaitility.await().untilAsserted(() -> {
String lastConfirmedEntry4 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
assertNotEquals(lastConfirmedEntry3, lastConfirmedEntry4);
});
// Clear environment.
consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
Awaitility.await().untilAsserted(() -> {
long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize();
assertEquals(backlog4, 0);
});
}

private void sendMessageAndWaitSnapshotComplete(Producer<byte[]> producer, String topic,
int messageSum) throws Exception {
for (int i = 0; i < messageSum; i++) {
producer.newMessage().send();
}
long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize();
Awaitility.await().untilAsserted(() -> {
long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize();
assertTrue(backlog2 > backlog1);
});
// Wait snapshot write completely, stop writing marker into topic.
Awaitility.await().untilAsserted(() -> {
String lastConfirmedEntry1 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
PersistentTopicInternalStats persistentTopicInternalStats = admin1.topics().getInternalStats(topic, false);
Thread.sleep(1000);
String lastConfirmedEntry2 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
assertEquals(lastConfirmedEntry1, lastConfirmedEntry2);
});
}

void publishMessages(Producer<byte[]> producer, int startIndex, int numMessages, Set<String> sentMessages)
throws PulsarClientException {
for (int i = startIndex; i < startIndex + numMessages; i++) {
Expand Down

0 comments on commit 2322004

Please sign in to comment.