From a368ae927741930499db478cf506860ad452b922 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 19 Dec 2024 18:14:21 +0800 Subject: [PATCH 1/3] [fix] [ml] Fix cursor metadata compatability issue when switching the config unackedRangesOpenCacheSetEnabled --- .../mledger/impl/ManagedCursorImpl.java | 45 +++++++++++-- .../mledger/impl/RangeSetWrapper.java | 15 +++-- .../mledger/impl/ManagedLedgerBkTest.java | 65 ++++++++++++------- 3 files changed, 92 insertions(+), 33 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 022cecf8d57b5..9b609bcb83863 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -639,7 +639,22 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) { try { Map rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, list -> list.getValuesList().stream().mapToLong(i -> i).toArray())); - individualDeletedMessages.build(rangeMap); + // Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled". + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { + individualDeletedMessages.build(rangeMap); + } else { + RangeSetWrapper rangeSetWrapperV2 = new RangeSetWrapper<>(positionRangeConverter, + positionRangeReverseConverter, true, + getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled()); + rangeSetWrapperV2.build(rangeMap); + rangeSetWrapperV2.forEach(range -> { + individualDeletedMessages.addOpenClosed(range.lowerEndpoint().getLedgerId(), + range.lowerEndpoint().getEntryId(), range.upperEndpoint().getLedgerId(), + range.upperEndpoint().getEntryId()); + return true; + }); + rangeSetWrapperV2.clear(); + } } catch (Exception e) { log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(), name, e); @@ -2367,7 +2382,14 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. - Position previousPosition = ledger.getPreviousPosition(position); + // Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing + // multi ledgers, so the first position's entryId maybe "-1". + Position previousPosition; + if (position.getEntryId() == 0) { + previousPosition = PositionFactory.create(position.getLedgerId(), -1); + } else { + previousPosition = ledger.getPreviousPosition(position); + } individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); @@ -3222,10 +3244,21 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin .addAllProperties(buildPropertiesMap(mdEntry.properties)); Map internalRanges = null; - try { - internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); - } catch (Exception e) { - log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + /** + * Cursor will create the {@link #individualDeletedMessages} typed {@link LongPairRangeSet.DefaultRangeSet} if + * disabled the config {@link ManagedLedgerConfig#unackedRangesOpenCacheSetEnabled}. + * {@link LongPairRangeSet.DefaultRangeSet} never implemented the methods below: + * - {@link LongPairRangeSet#toRanges(int)}, which is used to serialize cursor metadata. + * - {@link LongPairRangeSet#build(Map)}, which is used to deserialize cursor metadata. + * Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization + * and deserialization error. + */ + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { + try { + internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); + } catch (Exception e) { + log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + } } if (internalRanges != null && !internalRanges.isEmpty()) { piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges)); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index 11cce409bec54..89772ed2a7736 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -40,7 +40,6 @@ public class RangeSetWrapper> implements LongPairRangeSe private final LongPairRangeSet rangeSet; private final LongPairConsumer rangeConverter; - private final ManagedLedgerConfig config; private final boolean enableMultiEntry; /** @@ -53,13 +52,19 @@ public class RangeSetWrapper> implements LongPairRangeSe public RangeSetWrapper(LongPairConsumer rangeConverter, RangeBoundConsumer rangeBoundConsumer, ManagedCursorImpl managedCursor) { - requireNonNull(managedCursor); - this.config = managedCursor.getManagedLedger().getConfig(); + this(rangeConverter, rangeBoundConsumer, managedCursor.getConfig().isUnackedRangesOpenCacheSetEnabled(), + managedCursor.getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled()); + } + + public RangeSetWrapper(LongPairConsumer rangeConverter, + RangeBoundConsumer rangeBoundConsumer, + boolean unackedRangesOpenCacheSetEnabled, + boolean persistentUnackedRangesWithMultipleEntriesEnabled) { this.rangeConverter = rangeConverter; - this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled() + this.rangeSet = unackedRangesOpenCacheSetEnabled ? new OpenLongPairRangeSet<>(rangeConverter, RoaringBitSet::new) : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer); - this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled(); + this.enableMultiEntry = persistentUnackedRangesWithMultipleEntriesEnabled; } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 9635376a782d3..710454e1f1ff7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -24,9 +24,11 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; @@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.api.DigestType; @@ -54,13 +57,17 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.awaitility.Awaitility; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import io.netty.buffer.ByteBuf; import lombok.Cleanup; +@Slf4j public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { + private final ObjectMapper jackson = new ObjectMapper(); + public ManagedLedgerBkTest() { super(2); } @@ -590,44 +597,58 @@ public void testPeriodicRollover() throws Exception { Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId); } + @DataProvider(name = "unackedRangesOpenCacheSetEnabledPair") + public Object[][] unackedRangesOpenCacheSetEnabledPair() { + return new Object[][]{ + {false, true}, + {true, false}, + {true, true}, + {false, false} + }; + } + /** * This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger. - * * @throws Exception */ - @Test - public void testUnackmessagesAndRecovery() throws Exception { + @Test(dataProvider = "unackedRangesOpenCacheSetEnabledPair") + public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean enabled2) throws Exception { + final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "c1"; ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); - factoryConf.setMaxCacheSize(0); - ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); - - ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1) + final ManagedLedgerConfig config1 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1) .setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1) - .setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1); - ManagedLedger ledger = factory.open("my_test_unack_messages", config); - ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + .setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1) + .setUnackedRangesOpenCacheSetEnabled(enabled1); + final ManagedLedgerConfig config2 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1) + .setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1) + .setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1) + .setUnackedRangesOpenCacheSetEnabled(enabled2); + + ManagedLedger ledger1 = factory.open(mlName, config1); + ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName); int totalEntries = 100; for (int i = 0; i < totalEntries; i++) { - Position p = ledger.addEntry("entry".getBytes()); + Position p = ledger1.addEntry("entry".getBytes()); if (i % 2 == 0) { - cursor.delete(p); + cursor1.delete(p); } } + log.info("ack ranges: {}", cursor1.getIndividuallyDeletedMessagesSet().size()); - LongPairRangeSet unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet(); + // reopen and recover cursor + ledger1.close(); + ManagedLedger ledger2 = factory.open(mlName, config2); + ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger2.openCursor(cursorName); - ledger.close(); + log.info("before: {}", cursor1.getIndividuallyDeletedMessagesSet().asRanges()); + log.info("after : {}", cursor2.getIndividuallyDeletedMessagesSet().asRanges()); + assertEquals(cursor1.getIndividuallyDeletedMessagesSet().asRanges(), cursor2.getIndividuallyDeletedMessagesSet().asRanges()); + assertEquals(cursor1.markDeletePosition, cursor2.markDeletePosition); - // open and recover cursor - ledger = factory.open("my_test_unack_messages", config); - cursor = (ManagedCursorImpl) ledger.openCursor("c1"); - - LongPairRangeSet unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet(); - assertTrue(unackMessagesBefore.equals(unackMessagesAfter)); - - ledger.close(); + ledger2.close(); factory.shutdown(); } } From e5e3873a36d5de0dbd1118e5eccbb61f4eb52895 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 19 Dec 2024 21:35:33 +0800 Subject: [PATCH 2/3] checkstyle --- .../org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java | 2 -- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java | 1 - 2 files changed, 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index 89772ed2a7736..76ac3e1be726c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -18,14 +18,12 @@ */ package org.apache.bookkeeper.mledger.impl; -import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Range; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet; import org.roaringbitmap.RoaringBitSet; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 710454e1f1ff7..037eccf947135 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -55,7 +55,6 @@ import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; -import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; From f2a24413392a29c53ffecd4c2a8e22ed600a535e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 25 Dec 2024 12:50:32 +0800 Subject: [PATCH 3/3] add config: persistIndividualAckAsLongArray --- .../mledger/ManagedLedgerConfig.java | 7 ++ .../mledger/impl/ManagedCursorImpl.java | 5 +- .../mledger/impl/ManagedLedgerBkTest.java | 59 ++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 4 ++ .../pulsar/broker/service/BrokerService.java | 2 + .../service/ManagedLedgerConfigTest.java | 70 +++++++++++++++++++ 6 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerConfigTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index a1e1deb503e20..6fc39170e851a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -93,6 +93,8 @@ public class ManagedLedgerConfig { @Getter @Setter private String shadowSourceName; + @Getter + private boolean persistIndividualAckAsLongArray; public boolean isCreateIfMissing() { return createIfMissing; @@ -103,6 +105,11 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { return this; } + public ManagedLedgerConfig setPersistIndividualAckAsLongArray(boolean persistIndividualAckAsLongArray) { + this.persistIndividualAckAsLongArray = persistIndividualAckAsLongArray; + return this; + } + /** * @return the lazyCursorRecovery */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 9b609bcb83863..217cbd6bd3dbb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -178,7 +178,8 @@ public class ManagedCursorImpl implements ManagedCursor { protected volatile long messagesConsumedCounter; // Current ledger used to append the mark-delete position - private volatile LedgerHandle cursorLedger; + @VisibleForTesting + volatile LedgerHandle cursorLedger; // Wether the current cursorLedger is read-only or writable private boolean isCursorLedgerReadOnly = true; @@ -3253,7 +3254,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin * Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization * and deserialization error. */ - if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { + if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) { try { internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); } catch (Exception e) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 037eccf947135..e23937afea2c9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -52,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; @@ -650,4 +653,60 @@ public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean ledger2.close(); factory.shutdown(); } + + @DataProvider(name = "booleans") + public Object[][] booleans() { + return new Object[][] { + {true}, + {false}, + }; + } + + @Test(dataProvider = "booleans") + public void testConfigPersistIndividualAckAsLongArray(boolean enable) throws Exception { + final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "c1"; + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + final ManagedLedgerConfig config = new ManagedLedgerConfig() + .setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1) + .setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1) + .setMaxUnackedRangesToPersistInMetadataStore(1) + .setUnackedRangesOpenCacheSetEnabled(true).setPersistIndividualAckAsLongArray(enable); + + ManagedLedger ledger1 = factory.open(mlName, config); + ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName); + + // Write entries. + int totalEntries = 100; + List entries = new ArrayList<>(); + for (int i = 0; i < totalEntries; i++) { + Position p = ledger1.addEntry("entry".getBytes()); + entries.add(p); + } + // Make ack holes and trigger a mark deletion. + for (int i = totalEntries - 1; i >=0 ; i--) { + if (i % 2 == 0) { + cursor1.delete(entries.get(i)); + } + } + cursor1.markDelete(entries.get(9)); + Awaitility.await().untilAsserted(() -> { + assertEquals(cursor1.pendingMarkDeleteOps.size(), 0); + }); + + // Verify: the config affects. + long cursorLedgerLac = cursor1.cursorLedger.getLastAddConfirmed(); + LedgerEntry ledgerEntry = cursor1.cursorLedger.readEntries(cursorLedgerLac, cursorLedgerLac).nextElement(); + MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry()); + if (enable) { + assertNotEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0); + } else { + assertEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0); + } + + // cleanup + ledger1.close(); + factory.shutdown(); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0f7ae00713dce..488af679a3a88 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2243,6 +2243,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " will only be tracked in memory and messages will be redelivered in case of" + " crashes.") private int managedLedgerMaxUnackedRangesToPersist = 10000; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate") + private boolean managedLedgerPersistIndividualAckAsLongArray = false; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6afa1ae32fbcb..49f64e368f18e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2000,6 +2000,8 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T managedLedgerConfig .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig + .setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray()); managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled( serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled()); managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerConfigTest.java new file mode 100644 index 0000000000000..f9ce0d5019495 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerConfigTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ManagedLedgerConfigTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "booleans") + public Object[][] booleans() { + return new Object[][] { + {true}, + {false}, + }; + } + + @Test(dataProvider = "booleans") + public void testConfigPersistIndividualAckAsLongArray(boolean enabled) throws Exception { + pulsar.getConfiguration().setManagedLedgerPersistIndividualAckAsLongArray(enabled); + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(tpName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, true).get().get(); + ManagedLedgerConfig mlConf = topic.getManagedLedger().getConfig(); + assertEquals(mlConf.isPersistIndividualAckAsLongArray(), enabled); + + // cleanup. + admin.topics().delete(tpName); + } +} +