Skip to content

Commit

Permalink
[improve][broker] Fix ServiceUnitStateCompactionStrategy to cover fas…
Browse files Browse the repository at this point in the history
…t-forward cursor behavior after compaction (#20110)

Master Issue: #16691

### Motivation

Raising a PR to implement: #16691

After the compaction, the cursor can fast-forward to the compacted horizon when a large number of messages are compacted before the next read. Hence, ServiceUnitStateCompactionStrategy also needs to cover this case. Currently, the existing and slow(their states are far behind) tableviews with ServiceUnitStateCompactionStrategy could not accept those compacted messages. In the load balance extension context, this means the ownership data could be inconsistent among brokers.

### Modifications
This PR 
  - fixes ServiceUnitStateCompactionStrategy to accept the state data if its version is bigger than the current version +1.
  - (minor fix) does not repeatedly update the replication_clusters in the policies when creating the system namespace. This update redundantly triggers ZK watchers when restarting brokers.
  -  sets closeWithoutWaitingClientDisconnect=true, upon unload(following the same setting as the modular LM's)
  • Loading branch information
heesung-sn authored Apr 18, 2023
1 parent 092819b commit 6cfa468
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,19 @@ static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName nam
namespaceResources.createPolicies(namespaceName, policies);
} else {
log.info("Namespace {} already exists.", namespaceName);
namespaceResources.setPolicies(namespaceName, policies -> {
policies.replication_clusters.add(cluster);
return policies;
});
var replicaClusterFound = false;
var policiesOptional = namespaceResources.getPolicies(namespaceName);
if (policiesOptional.isPresent() && policiesOptional.get().replication_clusters.contains(cluster)) {
replicaClusterFound = true;
}
if (!replicaClusterFound) {
namespaceResources.setPolicies(namespaceName, policies -> {
policies.replication_clusters.add(cluster);
return policies;
});
log.info("Updated namespace:{} policies. Added the replication cluster:{}",
namespaceName, cluster);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
false,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
.thenApply(numUnloadedTopics -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to
return false;
}

// Skip the compaction case where from = null and to.versionId > 1
if (from != null && from.versionId() + 1 != to.versionId()) {
return true;
if (from != null) {
if (from.versionId() == Long.MAX_VALUE && to.versionId() == Long.MIN_VALUE) { // overflow
} else if (from.versionId() >= to.versionId()) {
return true;
} else if (from.versionId() < to.versionId() - 1) { // Compacted
return false;
} // else from.versionId() == to.versionId() - 1 // continue to check further
}

if (to.force()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private <T> CompletableFuture<Long> runPhaseTwo(
});
})
.thenCompose(v -> {
log.info("Acking ledger id {}", phaseOneResult.firstId);
log.info("Acking ledger id {}", phaseOneResult.lastId);
return ((CompactionReaderImpl<T>) reader)
.acknowledgeCumulativeAsync(
phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Except
restartBroker();
pulsar1 = pulsar;
setPrimaryLoadManager();
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));

var serviceUnitStateChannelPrimaryNew =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void testVersionId(){
new ServiceUnitStateData(Owned, dst, src, 10),
new ServiceUnitStateData(Releasing, "broker2", dst, 5)));

assertFalse(strategy.shouldKeepLeft(
new ServiceUnitStateData(Owned, dst, src, 10),
new ServiceUnitStateData(Owned, "broker2", dst, 12)));

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -49,6 +52,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
Expand All @@ -69,6 +73,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -628,6 +633,80 @@ public void testSlowTableviewAfterCompaction() throws Exception {

}

@Test
public void testSlowReceiveTableviewAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
String strategyClassName = "topicCompactionStrategyClassName";

pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscribe().close();

var tv = pulsar.getClient().newTableViewBuilder(schema)
.topic(topic)
.subscriptionName("slowTV")
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();

// Configure retention to ensue data is retained for reader
admin.namespaces().setRetention("my-property/use/my-ns",
new RetentionPolicies(-1, -1));

Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);

var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>) FieldUtils
.readDeclaredField(tv, "reader", true)).get();
var consumer = spy(reader.getConsumer());
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
String bundle = "bundle1";
final AtomicInteger versionId = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(1);
int msgAddCount = 1000; // has to be big enough to cover compacted cursor fast-forward.
doAnswer(invocationOnMock -> {
if (cnt.decrementAndGet() == 0) {
var msg = consumer.receiveAsync();
for (int i = 0; i < msgAddCount; i++) {
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
}
compactor.compact(topic, strategy).join();
return msg;
}
// Call the real method
reset(consumer);
return consumer.receiveAsync();
}).when(consumer).receiveAsync();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker", true,
versionId.incrementAndGet())).send();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
var val = tv.get(bundle);
assertNotNull(val);
assertEquals(val.dstBroker(), "broker" + versionId.get());
}
);

producer.close();
tv.close();
}

@Test
public void testBrokerRestartAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ private void handleMessage(Message<T> msg) {
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
if (!update) {
log.info("Skipped the message from topic {}. key={} value={} prev={}",
conf.getTopicName(),
key,
cur,
prev);
}
}

if (update) {
Expand Down

0 comments on commit 6cfa468

Please sign in to comment.