Skip to content

Commit

Permalink
[fix] [ml] Fix cursor metadata compatability issue when switching the…
Browse files Browse the repository at this point in the history
… config unackedRangesOpenCacheSetEnabled
  • Loading branch information
poorbarcode committed Dec 19, 2024
1 parent 9a7269a commit a368ae9
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,22 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
try {
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
individualDeletedMessages.build(rangeMap);
// Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled".
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
individualDeletedMessages.build(rangeMap);
} else {
RangeSetWrapper<Position> rangeSetWrapperV2 = new RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, true,
getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
rangeSetWrapperV2.build(rangeMap);
rangeSetWrapperV2.forEach(range -> {
individualDeletedMessages.addOpenClosed(range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(), range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
rangeSetWrapperV2.clear();
}
} catch (Exception e) {
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
name, e);
Expand Down Expand Up @@ -2367,7 +2382,14 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Position previousPosition = ledger.getPreviousPosition(position);
// Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
// multi ledgers, so the first position's entryId maybe "-1".
Position previousPosition;
if (position.getEntryId() == 0) {
previousPosition = PositionFactory.create(position.getLedgerId(), -1);
} else {
previousPosition = ledger.getPreviousPosition(position);
}
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
Expand Down Expand Up @@ -3222,10 +3244,21 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
.addAllProperties(buildPropertiesMap(mdEntry.properties));

Map<Long, long[]> internalRanges = null;
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
/**
* Cursor will create the {@link #individualDeletedMessages} typed {@link LongPairRangeSet.DefaultRangeSet} if
* disabled the config {@link ManagedLedgerConfig#unackedRangesOpenCacheSetEnabled}.
* {@link LongPairRangeSet.DefaultRangeSet} never implemented the methods below:
* - {@link LongPairRangeSet#toRanges(int)}, which is used to serialize cursor metadata.
* - {@link LongPairRangeSet#build(Map)}, which is used to deserialize cursor metadata.
* Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization
* and deserialization error.
*/
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
}
}
if (internalRanges != null && !internalRanges.isEmpty()) {
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe

private final LongPairRangeSet<T> rangeSet;
private final LongPairConsumer<T> rangeConverter;
private final ManagedLedgerConfig config;
private final boolean enableMultiEntry;

/**
Expand All @@ -53,13 +52,19 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
ManagedCursorImpl managedCursor) {
requireNonNull(managedCursor);
this.config = managedCursor.getManagedLedger().getConfig();
this(rangeConverter, rangeBoundConsumer, managedCursor.getConfig().isUnackedRangesOpenCacheSetEnabled(),
managedCursor.getConfig().isPersistentUnackedRangesWithMultipleEntriesEnabled());
}

public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
RangeBoundConsumer<T> rangeBoundConsumer,
boolean unackedRangesOpenCacheSetEnabled,
boolean persistentUnackedRangesWithMultipleEntriesEnabled) {
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
this.rangeSet = unackedRangesOpenCacheSetEnabled
? new OpenLongPairRangeSet<>(rangeConverter, RoaringBitSet::new)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
this.enableMultiEntry = persistentUnackedRangesWithMultipleEntriesEnabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -54,13 +57,17 @@
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

private final ObjectMapper jackson = new ObjectMapper();

public ManagedLedgerBkTest() {
super(2);
}
Expand Down Expand Up @@ -590,44 +597,58 @@ public void testPeriodicRollover() throws Exception {
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

@DataProvider(name = "unackedRangesOpenCacheSetEnabledPair")
public Object[][] unackedRangesOpenCacheSetEnabledPair() {
return new Object[][]{
{false, true},
{true, false},
{true, true},
{false, false}
};
}

/**
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
*
* @throws Exception
*/
@Test
public void testUnackmessagesAndRecovery() throws Exception {
@Test(dataProvider = "unackedRangesOpenCacheSetEnabledPair")
public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean enabled2) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
final ManagedLedgerConfig config1 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled1);
final ManagedLedgerConfig config2 = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1)
.setUnackedRangesOpenCacheSetEnabled(enabled2);

ManagedLedger ledger1 = factory.open(mlName, config1);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

int totalEntries = 100;
for (int i = 0; i < totalEntries; i++) {
Position p = ledger.addEntry("entry".getBytes());
Position p = ledger1.addEntry("entry".getBytes());
if (i % 2 == 0) {
cursor.delete(p);
cursor1.delete(p);
}
}
log.info("ack ranges: {}", cursor1.getIndividuallyDeletedMessagesSet().size());

LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();
// reopen and recover cursor
ledger1.close();
ManagedLedger ledger2 = factory.open(mlName, config2);
ManagedCursorImpl cursor2 = (ManagedCursorImpl) ledger2.openCursor(cursorName);

ledger.close();
log.info("before: {}", cursor1.getIndividuallyDeletedMessagesSet().asRanges());
log.info("after : {}", cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.getIndividuallyDeletedMessagesSet().asRanges(), cursor2.getIndividuallyDeletedMessagesSet().asRanges());
assertEquals(cursor1.markDeletePosition, cursor2.markDeletePosition);

// open and recover cursor
ledger = factory.open("my_test_unack_messages", config);
cursor = (ManagedCursorImpl) ledger.openCursor("c1");

LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));

ledger.close();
ledger2.close();
factory.shutdown();
}
}

0 comments on commit a368ae9

Please sign in to comment.