diff --git a/conf/broker.conf b/conf/broker.conf index 6211943212895..29c76b538a3bc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -525,6 +525,9 @@ brokerServiceCompactionThresholdInBytes=0 # If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed. brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 +# Whether retain null-key message during topic compaction +topicCompactionRemainNullKey=true + # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will # be no tracking overhead. diff --git a/conf/standalone.conf b/conf/standalone.conf index 4505bce92aa2d..a45d0e98d966d 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1197,3 +1197,6 @@ configurationStoreServers= # zookeeper. # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 + +# Whether retain null-key message during topic compaction +topicCompactionRemainNullKey=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 754c74ab3e7a4..88e052abe98d7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2472,6 +2472,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Whether retain null-key message during topic compaction." + ) + private boolean topicCompactionRemainNullKey = true; + @FieldContext( category = CATEGORY_SCHEMA, doc = "Enforce schema validation on following cases:\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index c096d3610667e..b3c9d7c9f2ba6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -90,6 +90,11 @@ public static List> extractIdsAndKey return idsAndKeysAndSize; } + public static Optional rebatchMessage(RawMessage msg, + BiPredicate filter) throws IOException { + return rebatchMessage(msg, filter, true); + } + /** * Take a batched message and a filter, and returns a message with the only the sub-messages * which match the filter. Returns an empty optional if no messages match. @@ -97,7 +102,8 @@ public static List> extractIdsAndKey * NOTE: this message does not alter the reference count of the RawMessage argument. */ public static Optional rebatchMessage(RawMessage msg, - BiPredicate filter) + BiPredicate filter, + boolean retainNullKey) throws IOException { checkArgument(msg.getMessageIdData().getBatchIndex() == -1); @@ -125,9 +131,14 @@ public static Optional rebatchMessage(RawMessage msg, msg.getMessageIdData().getPartition(), i); if (!singleMessageMetadata.hasPartitionKey()) { - messagesRetained++; - Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, - singleMessagePayload, batchBuffer); + if (retainNullKey) { + messagesRetained++; + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, + singleMessagePayload, batchBuffer); + } else { + Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata, + Unpooled.EMPTY_BUFFER, batchBuffer); + } } else if (filter.test(singleMessageMetadata.getPartitionKey(), id) && singleMessagePayload.readableBytes() > 0) { messagesRetained++; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index e4cb5dda77122..831baffd7f229 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor { private static final int MAX_OUTSTANDING = 500; private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; private final Duration phaseOneLoopReadTimeout; + private final boolean topicCompactionRemainNullKey; public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, @@ -69,6 +70,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf, ScheduledExecutorService scheduler) { super(conf, pulsar, bk, scheduler); phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey(); } @Override @@ -133,8 +135,16 @@ private void phaseOneLoop(RawReader reader, int numMessagesInBatch = metadata.getNumMessagesInBatch(); int deleteCnt = 0; for (ImmutableTriple e : RawBatchConverter - .extractIdsAndKeysAndSize(m, false)) { + .extractIdsAndKeysAndSize(m, true)) { if (e != null) { + if (e.getMiddle() == null) { + if (!topicCompactionRemainNullKey) { + // record delete null-key message event + deleteCnt++; + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } + continue; + } if (e.getRight() > 0) { MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); if (old != null) { @@ -164,6 +174,10 @@ private void phaseOneLoop(RawReader reader, deletedMessage = true; latestForKey.remove(keyAndSize.getLeft()); } + } else { + if (!topicCompactionRemainNullKey) { + deletedMessage = true; + } } if (replaceMessage || deletedMessage) { mxBean.addCompactionRemovedEvent(reader.getTopic()); @@ -242,7 +256,6 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) { - m.close(); phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId); return; } @@ -254,7 +267,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map if (RawBatchConverter.isReadableBatch(m)) { try { messageToAdd = RawBatchConverter.rebatchMessage( - m, (key, subid) -> subid.equals(latestForKey.get(key))); + m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey); } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", id, ioe); @@ -263,8 +276,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } else { Pair keyAndSize = extractKeyAndSize(m); MessageId msg; - if (keyAndSize == null) { // pass through messages without a key - messageToAdd = Optional.of(m); + if (keyAndSize == null) { + messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty(); } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null && msg.equals(id)) { // consider message only if present into latestForKey map if (keyAndSize.getRight() <= 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 09544ee957e2b..29497f2e0e73f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1393,7 +1393,7 @@ public void testDeleteTenant() throws Exception { assertFalse(admin.topics().getList(namespace).isEmpty()); try { - deleteNamespaceGraceFully(namespace, false); + admin.namespaces().deleteNamespace(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant @@ -1470,7 +1470,7 @@ public void testDeleteNamespace() throws Exception { try { - deleteNamespaceGraceFully(namespace, false); + admin.namespaces().deleteNamespace(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 341842f19811f..94f8bbac20d3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1437,7 +1437,7 @@ public void testDeleteNamespaceForcefully() throws Exception { assertFalse(admin.topics().getList(namespace).isEmpty()); try { - deleteNamespaceGraceFully(namespace, false); + admin.namespaces().deleteNamespace(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index efdfe0fe188c4..9226810c34934 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -18,39 +18,25 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; -import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import org.apache.commons.collections4.CollectionUtils; +import com.google.common.collect.Sets; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.api.coordination.LockManager; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; - -import java.util.Random; - +@Slf4j public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest { protected static final int ASYNC_EVENT_COMPLETION_WAIT = 100; @@ -132,97 +118,30 @@ protected String newTopicName() { */ protected void deleteNamespaceGraceFully(String ns, boolean force) throws Exception { - deleteNamespaceGraceFully(ns, force, pulsar, admin); + deleteNamespaceGraceFully(ns, force, pulsar , admin); } /** - * Wait until system topic "__change_event" and subscription "__compaction" are created, and then delete the namespace. + * 1. Pause system "__change_event" topic creates. + * 2. Do delete namespace with retry because maybe fail by race-condition with create topics. */ public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarService pulsar, PulsarAdmin admin) throws Exception { - // namespace v1 should not wait system topic create. - if (ns.split("/").length > 2){ - admin.namespaces().deleteNamespace(ns, force); - return; - } - if (!pulsar.getConfiguration().isSystemTopicEnabled()){ - admin.namespaces().deleteNamespace(ns, force); - return; - } - // If no bundle has been loaded, then the System Topic will not trigger creation. - LockManager lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class); - List lockedBundles = (List) lockManager.listLocks("/namespace" + "/" + ns).join(); - if (CollectionUtils.isEmpty(lockedBundles)){ - admin.namespaces().deleteNamespace(ns, force); - return; - } - // Trigger change event topic create. - NamespaceName namespace = NamespaceName.get(ns); - NamespaceBundle namespaceBundle = mock(NamespaceBundle.class); - when(namespaceBundle.getNamespaceObject()).thenReturn(namespace); - pulsar.getTopicPoliciesService().addOwnedNamespaceBundleAsync(namespaceBundle); - // Wait for change event topic and compaction create finish. - String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); - int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); - ArrayList expectChangeEventTopics = new ArrayList<>(); - if ("non-partitioned".equals(allowAutoTopicCreationType)){ - String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME); - expectChangeEventTopics.add(t); - } else { - for (int i = 0; i < defaultNumPartitions; i++){ - String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i); - expectChangeEventTopics.add(t); - } - } - Awaitility.await().until(() -> { - boolean finished = true; - for (String changeEventTopicName : expectChangeEventTopics){ - boolean bundleExists = pulsar.getNamespaceService() - .checkTopicOwnership(TopicName.get(changeEventTopicName)) - .exceptionally(ex -> false).join(); - if (!bundleExists){ - finished = false; - break; - } - CompletableFuture> completableFuture = - pulsar.getBrokerService().getTopic(changeEventTopicName, false); - if (completableFuture == null){ - finished = false; - break; - } - Optional optionalTopic = completableFuture.get(); - if (!optionalTopic.isPresent()){ - finished = false; - break; - } - PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get(); - if (!changeEventTopic.isCompactionEnabled()){ - break; - } - if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){ - finished = false; - break; - } - } - return finished; - }); - int retryTimes = 3; - while (true) { + Awaitility.await() + .pollDelay(500, TimeUnit.MILLISECONDS) + .until(() -> { try { + // Maybe fail by race-condition with create topics, just retry. admin.namespaces().deleteNamespace(ns, force); - break; - } catch (PulsarAdminException ex) { - // Do retry only if topic fenced. - if (ex.getStatusCode() == 500 && ex.getMessage().contains("TopicFencedException")){ - if (--retryTimes > 0){ - continue; - } else { - throw ex; - } - } - throw ex; + return true; + } catch (PulsarAdminException.NotFoundException ex) { + // namespace was already deleted, ignore exception + return true; + } catch (Exception e) { + log.warn("Failed to delete namespace {} (force={})", ns, force, e); + return false; } - } + }); } private static final Logger LOG = LoggerFactory.getLogger(BrokerTestBase.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 45f723e91565f..945692a3092f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -88,7 +88,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void setup() throws Exception { setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 68775bc090ae5..006dce0c185f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -633,8 +633,16 @@ public void testWholeBatchCompactedOut() throws Exception { } } - @Test - public void testKeyLessMessagesPassThrough() throws Exception { + @DataProvider(name = "retainNullKey") + public static Object[][] retainNullKey() { + return new Object[][] {{true}, {false}}; + } + + @Test(dataProvider = "retainNullKey") + public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception { + conf.setTopicCompactionRemainNullKey(retainNullKey); + restartBroker(); + String topic = "persistent://my-property/use/my-ns/my-topic1"; // subscribe before sending anything, so that we get all messages @@ -664,29 +672,25 @@ public void testKeyLessMessagesPassThrough() throws Exception { try (Consumer consumer = pulsarClient.newConsumer().topic(topic) .subscriptionName("sub1").readCompacted(true).subscribe()){ - Message message1 = consumer.receive(); - Assert.assertFalse(message1.hasKey()); - Assert.assertEquals(new String(message1.getData()), "my-message-1"); - - Message message2 = consumer.receive(); - Assert.assertFalse(message2.hasKey()); - Assert.assertEquals(new String(message2.getData()), "my-message-2"); - - Message message3 = consumer.receive(); - Assert.assertEquals(message3.getKey(), "key1"); - Assert.assertEquals(new String(message3.getData()), "my-message-4"); - - Message message4 = consumer.receive(); - Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), "my-message-6"); - - Message message5 = consumer.receive(); - Assert.assertFalse(message5.hasKey()); - Assert.assertEquals(new String(message5.getData()), "my-message-7"); + List> result = new ArrayList<>(); + while (true) { + Message message = consumer.receive(10, TimeUnit.SECONDS); + if (message == null) { + break; + } + result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData()))); + } - Message message6 = consumer.receive(); - Assert.assertFalse(message6.hasKey()); - Assert.assertEquals(new String(message6.getData()), "my-message-8"); + List> expectList; + if (retainNullKey) { + expectList = List.of( + Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"), + Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"), + Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8")); + } else { + expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6")); + } + Assert.assertEquals(result, expectList); } } @@ -1861,7 +1865,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception { .topic(topicName).create(); for (int i = 0; i < 10; i+=2) { - producer.newMessage().key(null).value(new byte[4*1024*1024]).send(); + producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send(); } producer.flush();