Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [ml] Fix cursor metadata compatability issue when switching the config unackedRangesOpenCacheSetEnabled #23759

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private String shadowSourceName;
@Getter
private boolean persistIndividualAckAsLongArray;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand All @@ -103,6 +105,11 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
return this;
}

public ManagedLedgerConfig setPersistIndividualAckAsLongArray(boolean persistIndividualAckAsLongArray) {
this.persistIndividualAckAsLongArray = persistIndividualAckAsLongArray;
return this;
}

/**
* @return the lazyCursorRecovery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected volatile long messagesConsumedCounter;

// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
@VisibleForTesting
volatile LedgerHandle cursorLedger;

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
Expand Down Expand Up @@ -639,7 +640,22 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
try {
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
individualDeletedMessages.build(rangeMap);
// Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled".
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
individualDeletedMessages.build(rangeMap);
} else {
RangeSetWrapper<Position> rangeSetWrapperV2 = new RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, true,
getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
rangeSetWrapperV2.build(rangeMap);
rangeSetWrapperV2.forEach(range -> {
individualDeletedMessages.addOpenClosed(range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(), range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
rangeSetWrapperV2.clear();
}
} catch (Exception e) {
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
name, e);
Expand Down Expand Up @@ -2367,7 +2383,14 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Position previousPosition = ledger.getPreviousPosition(position);
// Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
// multi ledgers, so the first position's entryId maybe "-1".
Position previousPosition;
if (position.getEntryId() == 0) {
previousPosition = PositionFactory.create(position.getLedgerId(), -1);
} else {
previousPosition = ledger.getPreviousPosition(position);
}
Comment on lines +2389 to +2393
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move to the internal of the getPreviousPosition() method.

Copy link
Contributor Author

@poorbarcode poorbarcode Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move to the internal of the getPreviousPosition() method.

getPreviousPosition returns a meaningful position, in other words, it returns a position that exactly exists. And other methods rely on this feature, such as https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3621-L3623. Only cursor individual ack range does not allow crossing ledgers so far.

if (specifiedLac.getEntryId() < 0) {
     // Calculate the meaningful LAC.
     Position actLac = getPreviousPosition(specifiedLac);
    ...
}

individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
Expand Down Expand Up @@ -3222,10 +3245,21 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
.addAllProperties(buildPropertiesMap(mdEntry.properties));

Map<Long, long[]> internalRanges = null;
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
/**
* Cursor will create the {@link #individualDeletedMessages} typed {@link LongPairRangeSet.DefaultRangeSet} if
* disabled the config {@link ManagedLedgerConfig#unackedRangesOpenCacheSetEnabled}.
* {@link LongPairRangeSet.DefaultRangeSet} never implemented the methods below:
* - {@link LongPairRangeSet#toRanges(int)}, which is used to serialize cursor metadata.
* - {@link LongPairRangeSet#build(Map)}, which is used to deserialize cursor metadata.
* Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization
* and deserialization error.
*/
if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) {
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
}
}
if (internalRanges != null && !internalRanges.isEmpty()) {
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
import org.roaringbitmap.RoaringBitSet;
Expand All @@ -40,7 +38,6 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe

private final LongPairRangeSet<T> rangeSet;
private final LongPairConsumer<T> rangeConverter;
private final ManagedLedgerConfig config;
private final boolean enableMultiEntry;

/**
Expand All @@ -53,13 +50,19 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
ManagedCursorImpl managedCursor) {
requireNonNull(managedCursor);
this.config = managedCursor.getManagedLedger().getConfig();
this(rangeConverter, rangeBoundConsumer, managedCursor.getConfig().isUnackedRangesOpenCacheSetEnabled(),
managedCursor.getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
}

public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
boolean unackedRangesOpenCacheSetEnabled,
boolean persistentUnackedRangesWithMultipleEntriesEnabled) {
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
this.rangeSet = unackedRangesOpenCacheSetEnabled
? new OpenLongPairRangeSet<>(rangeConverter, RoaringBitSet::new)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
this.enableMultiEntry = persistentUnackedRangesWithMultipleEntriesEnabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,27 @@

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -49,18 +54,22 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

private final ObjectMapper jackson = new ObjectMapper();

public ManagedLedgerBkTest() {
super(2);
}
Expand Down Expand Up @@ -590,44 +599,114 @@ public void testPeriodicRollover() throws Exception {
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

@DataProvider(name = "unackedRangesOpenCacheSetEnabledPair")
public Object[][] unackedRangesOpenCacheSetEnabledPair() {
return new Object[][]{
{false, true},
{true, false},
{true, true},
{false, false}
};
}

/**
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
*
* @throws Exception
*/
@Test
public void testUnackmessagesAndRecovery() throws Exception {
@Test(dataProvider = "unackedRangesOpenCacheSetEnabledPair")
public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean enabled2) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
final ManagedLedgerConfig config1 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled1);
final ManagedLedgerConfig config2 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled2);

ManagedLedger ledger1 = factory.open(mlName, config1);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

int totalEntries = 100;
for (int i = 0; i < totalEntries; i++) {
Position p = ledger.addEntry("entry".getBytes());
Position p = ledger1.addEntry("entry".getBytes());
if (i % 2 == 0) {
cursor.delete(p);
cursor1.delete(p);
}
}
log.info("ack ranges: {}", cursor1.getIndividuallyDeletedMessagesSet().size());

LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();
// reopen and recover cursor
ledger1.close();
ManagedLedger ledger2 = factory.open(mlName, config2);
ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger2.openCursor(cursorName);

ledger.close();
log.info("before: {}", cursor1.getIndividuallyDeletedMessagesSet().asRanges());
log.info("after : {}", cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.getIndividuallyDeletedMessagesSet().asRanges(), cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.markDeletePosition, cursor2.markDeletePosition);

ledger2.close();
factory.shutdown();
}

// open and recover cursor
ledger = factory.open("my_test_unack_messages", config);
cursor = (ManagedCursorImpl) ledger.openCursor("c1");
@DataProvider(name = "booleans")
public Object[][] booleans() {
return new Object[][] {
{true},
{false},
};
}

LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));
@Test(dataProvider = "booleans")
public void testConfigPersistIndividualAckAsLongArray(boolean enable) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
final ManagedLedgerConfig config = new ManagedLedgerConfig()
.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1)
.setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1)
.setUnackedRangesOpenCacheSetEnabled(true).setPersistIndividualAckAsLongArray(enable);

ledger.close();
ManagedLedger ledger1 = factory.open(mlName, config);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

// Write entries.
int totalEntries = 100;
List<Position> entries = new ArrayList<>();
for (int i = 0; i < totalEntries; i++) {
Position p = ledger1.addEntry("entry".getBytes());
entries.add(p);
}
// Make ack holes and trigger a mark deletion.
for (int i = totalEntries - 1; i >=0 ; i--) {
if (i % 2 == 0) {
cursor1.delete(entries.get(i));
}
}
cursor1.markDelete(entries.get(9));
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor1.pendingMarkDeleteOps.size(), 0);
});

// Verify: the config affects.
long cursorLedgerLac = cursor1.cursorLedger.getLastAddConfirmed();
LedgerEntry ledgerEntry = cursor1.cursorLedger.readEntries(cursorLedgerLac, cursorLedgerLac).nextElement();
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
if (enable) {
assertNotEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
} else {
assertEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
}

// cleanup
ledger1.close();
factory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " will only be tracked in memory and messages will be redelivered in case of"
+ " crashes.")
private int managedLedgerMaxUnackedRangesToPersist = 10000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does change the existing Pulsar 4.0 behavior when this defaults to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will rollback the default behavior to 3.0.x, in other words, removed the default behavior changes that introduced by #9292

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,8 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T

managedLedgerConfig
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
Expand Down
Loading
Loading