Skip to content

Commit

Permalink
[improve][broker] Support not retaining null-key message during topic…
Browse files Browse the repository at this point in the history
… compaction (#21578) (#21665)

Co-authored-by: fengyubiao <[email protected]>
  • Loading branch information
coderzc and poorbarcode authored Dec 14, 2023
1 parent ef7e8a4 commit cd5fa17
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 139 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=true

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1197,3 +1197,6 @@ configurationStoreServers=
# zookeeper.
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=true
Original file line number Diff line number Diff line change
Expand Up @@ -2472,6 +2472,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether retain null-key message during topic compaction."
)
private boolean topicCompactionRemainNullKey = true;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,20 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
return idsAndKeysAndSize;
}

public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter) throws IOException {
return rebatchMessage(msg, filter, true);
}

/**
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

Expand Down Expand Up @@ -125,9 +131,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
if (retainNullKey) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
Unpooled.EMPTY_BUFFER, batchBuffer);
}
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public class TwoPhaseCompactor extends Compactor {
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
private final boolean topicCompactionRemainNullKey;

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}

@Override
Expand Down Expand Up @@ -133,8 +135,16 @@ private void phaseOneLoop(RawReader reader,
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
.extractIdsAndKeysAndSize(m, false)) {
.extractIdsAndKeysAndSize(m, true)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRemainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
continue;
}
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
Expand Down Expand Up @@ -164,6 +174,10 @@ private void phaseOneLoop(RawReader reader,
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
} else {
if (!topicCompactionRemainNullKey) {
deletedMessage = true;
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
Expand Down Expand Up @@ -242,7 +256,6 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
}

if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
m.close();
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId);
return;
}
Expand All @@ -254,7 +267,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
Expand All @@ -263,8 +276,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
if (keyAndSize == null) {
messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ public void testDeleteTenant() throws Exception {
assertFalse(admin.topics().getList(namespace).isEmpty());

try {
deleteNamespaceGraceFully(namespace, false);
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
Expand Down Expand Up @@ -1470,7 +1470,7 @@ public void testDeleteNamespace() throws Exception {


try {
deleteNamespaceGraceFully(namespace, false);
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ public void testDeleteNamespaceForcefully() throws Exception {
assertFalse(admin.topics().getList(namespace).isEmpty());

try {
deleteNamespaceGraceFully(namespace, false);
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,25 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.collections4.CollectionUtils;
import com.google.common.collect.Sets;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

import java.util.Random;

@Slf4j
public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
protected static final int ASYNC_EVENT_COMPLETION_WAIT = 100;

Expand Down Expand Up @@ -132,97 +118,30 @@ protected String newTopicName() {
*/
protected void deleteNamespaceGraceFully(String ns, boolean force)
throws Exception {
deleteNamespaceGraceFully(ns, force, pulsar, admin);
deleteNamespaceGraceFully(ns, force, pulsar , admin);
}

/**
* Wait until system topic "__change_event" and subscription "__compaction" are created, and then delete the namespace.
* 1. Pause system "__change_event" topic creates.
* 2. Do delete namespace with retry because maybe fail by race-condition with create topics.
*/
public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarService pulsar, PulsarAdmin admin)
throws Exception {
// namespace v1 should not wait system topic create.
if (ns.split("/").length > 2){
admin.namespaces().deleteNamespace(ns, force);
return;
}
if (!pulsar.getConfiguration().isSystemTopicEnabled()){
admin.namespaces().deleteNamespace(ns, force);
return;
}
// If no bundle has been loaded, then the System Topic will not trigger creation.
LockManager lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
List<String> lockedBundles = (List<String>) lockManager.listLocks("/namespace" + "/" + ns).join();
if (CollectionUtils.isEmpty(lockedBundles)){
admin.namespaces().deleteNamespace(ns, force);
return;
}
// Trigger change event topic create.
NamespaceName namespace = NamespaceName.get(ns);
NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
when(namespaceBundle.getNamespaceObject()).thenReturn(namespace);
pulsar.getTopicPoliciesService().addOwnedNamespaceBundleAsync(namespaceBundle);
// Wait for change event topic and compaction create finish.
String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
ArrayList<String> expectChangeEventTopics = new ArrayList<>();
if ("non-partitioned".equals(allowAutoTopicCreationType)){
String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
expectChangeEventTopics.add(t);
} else {
for (int i = 0; i < defaultNumPartitions; i++){
String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
expectChangeEventTopics.add(t);
}
}
Awaitility.await().until(() -> {
boolean finished = true;
for (String changeEventTopicName : expectChangeEventTopics){
boolean bundleExists = pulsar.getNamespaceService()
.checkTopicOwnership(TopicName.get(changeEventTopicName))
.exceptionally(ex -> false).join();
if (!bundleExists){
finished = false;
break;
}
CompletableFuture<Optional<Topic>> completableFuture =
pulsar.getBrokerService().getTopic(changeEventTopicName, false);
if (completableFuture == null){
finished = false;
break;
}
Optional<Topic> optionalTopic = completableFuture.get();
if (!optionalTopic.isPresent()){
finished = false;
break;
}
PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
if (!changeEventTopic.isCompactionEnabled()){
break;
}
if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
finished = false;
break;
}
}
return finished;
});
int retryTimes = 3;
while (true) {
Awaitility.await()
.pollDelay(500, TimeUnit.MILLISECONDS)
.until(() -> {
try {
// Maybe fail by race-condition with create topics, just retry.
admin.namespaces().deleteNamespace(ns, force);
break;
} catch (PulsarAdminException ex) {
// Do retry only if topic fenced.
if (ex.getStatusCode() == 500 && ex.getMessage().contains("TopicFencedException")){
if (--retryTimes > 0){
continue;
} else {
throw ex;
}
}
throw ex;
return true;
} catch (PulsarAdminException.NotFoundException ex) {
// namespace was already deleted, ignore exception
return true;
} catch (Exception e) {
log.warn("Failed to delete namespace {} (force={})", ns, force, e);
return false;
}
}
});
}

private static final Logger LOG = LoggerFactory.getLogger(BrokerTestBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class PendingAckPersistentTest extends TransactionTestBase {

private static final int NUM_PARTITIONS = 16;

@BeforeMethod
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
}
Expand Down
Loading

0 comments on commit cd5fa17

Please sign in to comment.