From e327219862faa225f1bdcc999cc13db9c93119e0 Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Mon, 20 Nov 2023 16:27:15 +0800 Subject: [PATCH] [improve][broker][branch-3.1] Support not retaining null-key message during topic compaction (#21578) --- conf/broker.conf | 3 + conf/standalone.conf | 5 +- .../pulsar/broker/ServiceConfiguration.java | 6 ++ .../pulsar/client/impl/RawBatchConverter.java | 20 +++++-- .../pulsar/compaction/TwoPhaseCompactor.java | 32 ++++++++--- .../pulsar/compaction/CompactionTest.java | 56 ++++++++++--------- 6 files changed, 85 insertions(+), 37 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4ad8536fd8d68..bc9b644b22193 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0 # If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed. brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 +# Whether retain null-key message during topic compaction +topicCompactionRemainNullKey=true + # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will # be no tracking overhead. diff --git a/conf/standalone.conf b/conf/standalone.conf index 76223c5933e45..b730bbc1290e2 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1277,4 +1277,7 @@ brokerInterceptorsDirectory=./interceptors brokerInterceptors= # Enable or disable the broker interceptor, which is only used for testing for now -disableBrokerInterceptors=true \ No newline at end of file +disableBrokerInterceptors=true + +# Whether retain null-key message during topic compaction +topicCompactionRemainNullKey=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index fb26775591345..912182ceba7f4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2777,6 +2777,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Whether retain null-key message during topic compaction." + ) + private boolean topicCompactionRemainNullKey = true; + @FieldContext( category = CATEGORY_SERVER, doc = "Interval between checks to see if cluster is migrated and marks topic migrated " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 167cc1b699c39..1dec55ea18816 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.magicBrokerEntryMetadata; + import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; @@ -92,6 +93,11 @@ public static List> extractIdsAndKey return idsAndKeysAndSize; } + public static Optional rebatchMessage(RawMessage msg, + BiPredicate filter) throws IOException { + return rebatchMessage(msg, filter, true); + } + /** * Take a batched message and a filter, and returns a message with the only the sub-messages * which match the filter. Returns an empty optional if no messages match. @@ -99,7 +105,8 @@ public static List> extractIdsAndKey * NOTE: this message does not alter the reference count of the RawMessage argument. */ public static Optional rebatchMessage(RawMessage msg, - BiPredicate filter) + BiPredicate filter, + boolean retainNullKey) throws IOException { checkArgument(msg.getMessageIdData().getBatchIndex() == -1); @@ -135,9 +142,14 @@ public static Optional rebatchMessage(RawMessage msg, msg.getMessageIdData().getPartition(), i); if (!singleMessageMetadata.hasPartitionKey()) { - messagesRetained++; - Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, - singleMessagePayload, batchBuffer); + if (retainNullKey) { + messagesRetained++; + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, + singleMessagePayload, batchBuffer); + } else { + Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata, + Unpooled.EMPTY_BUFFER, batchBuffer); + } } else if (filter.test(singleMessageMetadata.getPartitionKey(), id) && singleMessagePayload.readableBytes() > 0) { messagesRetained++; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index e4e067ad6117e..cb39cc93154fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor { private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class); private static final int MAX_OUTSTANDING = 500; private final Duration phaseOneLoopReadTimeout; + private final boolean topicCompactionRemainNullKey; public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, @@ -69,6 +70,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf, ScheduledExecutorService scheduler) { super(conf, pulsar, bk, scheduler); phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey(); } @Override @@ -134,6 +136,14 @@ private void phaseOneLoop(RawReader reader, int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch(m)) { if (e != null) { + if (e.getMiddle() == null) { + if (!topicCompactionRemainNullKey) { + // record delete null-key message event + deleteCnt++; + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } + continue; + } if (e.getRight() > 0) { MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); if (old != null) { @@ -163,6 +173,10 @@ private void phaseOneLoop(RawReader reader, deletedMessage = true; latestForKey.remove(keyAndSize.getLeft()); } + } else { + if (!topicCompactionRemainNullKey) { + deletedMessage = true; + } } if (replaceMessage || deletedMessage) { mxBean.addCompactionRemovedEvent(reader.getTopic()); @@ -249,8 +263,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); if (RawBatchConverter.isReadableBatch(m)) { try { - messageToAdd = rebatchMessage( - m, (key, subid) -> subid.equals(latestForKey.get(key))); + messageToAdd = rebatchMessage(reader.getTopic(), + m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey); } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", id, ioe); @@ -259,8 +273,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } else { Pair keyAndSize = extractKeyAndSize(m); MessageId msg; - if (keyAndSize == null) { // pass through messages without a key - messageToAdd = Optional.of(m); + if (keyAndSize == null) { + messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty(); } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null && msg.equals(id)) { // consider message only if present into latestForKey map if (keyAndSize.getRight() <= 0) { @@ -416,12 +430,16 @@ protected Pair extractKeyAndSize(RawMessage m) { protected List> extractIdsAndKeysAndSizeFromBatch(RawMessage msg) throws IOException { - return RawBatchConverter.extractIdsAndKeysAndSize(msg, false); + return RawBatchConverter.extractIdsAndKeysAndSize(msg); } - protected Optional rebatchMessage(RawMessage msg, BiPredicate filter) + protected Optional rebatchMessage(String topic, RawMessage msg, BiPredicate filter, + boolean retainNullKey) throws IOException { - return RawBatchConverter.rebatchMessage(msg, filter); + if (log.isDebugEnabled()) { + log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic); + } + return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey); } private static class PhaseOneResult { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 52837cbdcd56a..5ee12d660e031 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; + import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; @@ -640,8 +641,17 @@ public void testWholeBatchCompactedOut() throws Exception { } } - @Test - public void testKeyLessMessagesPassThrough() throws Exception { + @DataProvider(name = "retainNullKey") + public static Object[][] retainNullKey() { + return new Object[][] {{true}, {false}}; + } + + @Test(dataProvider = "retainNullKey") + public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception { + conf.setTopicCompactionRemainNullKey(retainNullKey); + restartBroker(); + FieldUtils.writeDeclaredField(compactor, "topicCompactionRemainNullKey", retainNullKey, true); + String topic = "persistent://my-property/use/my-ns/my-topic1"; // subscribe before sending anything, so that we get all messages @@ -682,29 +692,25 @@ public void testKeyLessMessagesPassThrough() throws Exception { Message m = consumer.receive(2, TimeUnit.SECONDS); assertNull(m); } else { - Message message1 = consumer.receive(); - Assert.assertFalse(message1.hasKey()); - Assert.assertEquals(new String(message1.getData()), "my-message-1"); - - Message message2 = consumer.receive(); - Assert.assertFalse(message2.hasKey()); - Assert.assertEquals(new String(message2.getData()), "my-message-2"); - - Message message3 = consumer.receive(); - Assert.assertEquals(message3.getKey(), "key1"); - Assert.assertEquals(new String(message3.getData()), "my-message-4"); - - Message message4 = consumer.receive(); - Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), "my-message-6"); - - Message message5 = consumer.receive(); - Assert.assertFalse(message5.hasKey()); - Assert.assertEquals(new String(message5.getData()), "my-message-7"); + List> result = new ArrayList<>(); + while (true) { + Message message = consumer.receive(10, TimeUnit.SECONDS); + if (message == null) { + break; + } + result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData()))); + } - Message message6 = consumer.receive(); - Assert.assertFalse(message6.hasKey()); - Assert.assertEquals(new String(message6.getData()), "my-message-8"); + List> expectList; + if (retainNullKey) { + expectList = List.of( + Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"), + Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"), + Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8")); + } else { + expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6")); + } + Assert.assertEquals(result, expectList); } } } @@ -1885,7 +1891,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception { .topic(topicName).create(); for (int i = 0; i < 10; i+=2) { - producer.newMessage().key(null).value(new byte[4*1024*1024]).send(); + producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send(); } producer.flush();