Skip to content

Commit

Permalink
When accumulating acks, update the batch index in batchDeletedIndexes…
Browse files Browse the repository at this point in the history
… and check whether it is greater than the batch index of the previous ack (#18042)

Co-authored-by: leolinchen <[email protected]>
  • Loading branch information
lordcheng10 and leolinchen authored Oct 21, 2022
1 parent ae7722b commit fa328a4
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,27 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie

if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (newPosition.ackSet != null) {
batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
// In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
// only update batchDeletedIndexes when the submitted batch index is greater
// than the recorded index.
batchDeletedIndexes.compute(newPosition,
(k, v) -> {
if (v == null) {
return givenBitSet;
}
if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
bitSetRecyclable.set(v);
return givenBitSet;
} else {
bitSetRecyclable.set(givenBitSet);
return v;
}
});
if (bitSetRecyclable.get() != null) {
bitSetRecyclable.get().recycle();
}
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
Expand Down Expand Up @@ -3337,6 +3338,37 @@ public void deleteMessagesCheckhMarkDelete() throws Exception {
assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
}

@Test
public void testBatchIndexMarkdelete() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("test_batch_index_delete");
ManagedCursor cursor = ledger.openCursor("c1");

final int totalEntries = 100;
final Position[] positions = new Position[totalEntries];
for (int i = 0; i < totalEntries; i++) {
// add entry
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
markDeleteBatchIndex(cursor, positions[0], 10, 3);
List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(0, deletedIndexes.get(0).getStart());
Assert.assertEquals(3, deletedIndexes.get(0).getEnd());

markDeleteBatchIndex(cursor, positions[0], 10, 4);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(0, deletedIndexes.get(0).getStart());
Assert.assertEquals(4, deletedIndexes.get(0).getEnd());

markDeleteBatchIndex(cursor, positions[0], 10, 2);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(0, deletedIndexes.get(0).getStart());
Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
}

@Test
public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("test_batch_index_delete");
Expand Down Expand Up @@ -3468,6 +3500,31 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
pos.ackSet = null;
}

private void markDeleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, int batchIndex
) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
PositionImpl pos = (PositionImpl) position;
BitSetRecyclable bitSet = new BitSetRecyclable();
bitSet.set(0, batchSize);
bitSet.clear(0, batchIndex + 1);

pos.ackSet = bitSet.toLongArray();

cursor.asyncMarkDelete(pos, new MarkDeleteCallback() {
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}

@Override
public void markDeleteComplete(Object ctx) {
latch.countDown();
}
}, null);
latch.await();
pos.ackSet = null;
}

private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) {
if (bitSetLongArray == null) {
return null;
Expand Down

0 comments on commit fa328a4

Please sign in to comment.