Skip to content

Commit

Permalink
Support not retaining null-key message during topic compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Nov 14, 2023
1 parent 44abba9 commit 7fbe7b5
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 16 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=false

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
5 changes: 4 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1285,4 +1285,7 @@ brokerInterceptorsDirectory=./interceptors
brokerInterceptors=

# Enable or disable the broker interceptor, which is only used for testing for now
disableBrokerInterceptors=true
disableBrokerInterceptors=true

# Whether retain null-key message during topic compaction
topicCompactionRemainNullKey=false
Original file line number Diff line number Diff line change
Expand Up @@ -2772,6 +2772,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether retain null-key message during topic compaction."
)
private boolean topicCompactionRemainNullKey = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,35 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
if (!smm.isCompactedOut()) {
if (smm.hasPartitionKey()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
} else {
idsAndKeysAndSize.add(null);
}
}
singleMessagePayload.release();
}
uncompressedPayload.release();
return idsAndKeysAndSize;
}

public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter) throws IOException {
return rebatchMessage(msg, filter, true);
}

/**
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

Expand Down Expand Up @@ -129,9 +139,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
if (retainNullKey) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
Unpooled.EMPTY_BUFFER, batchBuffer);
}
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
private final boolean topicCompactionRemainNullKey;

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}

@Override
Expand Down Expand Up @@ -144,6 +146,12 @@ private void phaseOneLoop(RawReader reader,
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
} else {
if (!topicCompactionRemainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
}
if (deleteCnt == numMessagesInBatch) {
Expand All @@ -163,6 +171,10 @@ private void phaseOneLoop(RawReader reader,
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
} else {
if (!topicCompactionRemainNullKey) {
deletedMessage = true;
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
Expand Down Expand Up @@ -249,8 +261,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
Expand All @@ -259,8 +271,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
if (keyAndSize == null) {
messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
Expand Down Expand Up @@ -419,9 +431,13 @@ protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAnd
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
}

protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
return RawBatchConverter.rebatchMessage(msg, filter);
if (log.isDebugEnabled()) {
log.info("Rebatching message {} for topic {}", msg.getMessageId(), topic);
}
return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
}

private static class PhaseOneResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1885,7 +1885,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {
.topic(topicName).create();

for (int i = 0; i < 10; i+=2) {
producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send();
}
producer.flush();

Expand Down Expand Up @@ -1984,4 +1984,60 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@DataProvider(name = "retainNullKey")
public static Object[][] retainNullKey() {
return new Object[][] {{true}, {false}};
}

@Test(dataProvider = "retainNullKey")
public void testCompactionNullKeyRetain(boolean retainNullKey) throws Exception {
conf.setTopicCompactionRemainNullKey(retainNullKey);
restartBroker();

final String topicName = "persistent://my-property/use/my-ns/testCompactionNullKeyRetain" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName).create();

producer.newMessage().key(null).value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.newMessage().key("K2").value("V3").send();
producer.newMessage().key(null).value("V4").send();
producer.newMessage().key("K1").value("V5").send();
producer.newMessage().key("K2").value("V6").send();
producer.newMessage().key(null).value("V7").send();
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).subscriptionName(subName)
.subscribe();

List<String> result = new ArrayList<>();
while (true) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
result.add(message.getValue());
}

if (!retainNullKey) {
Assert.assertEquals(result, List.of("V5", "V6"));
} else {
Assert.assertEquals(result, List.of("V1", "V4", "V5", "V6", "V7"));
}

consumer.close();
producer.close();
}
}

0 comments on commit 7fbe7b5

Please sign in to comment.