Skip to content

Commit

Permalink
[improve][ml] Filter out deleted entries before read entries from led…
Browse files Browse the repository at this point in the history
…ger. (apache#21739)

(cherry picked from commit c66167b)
  • Loading branch information
dao-jun authored and codelipenghui committed Feb 8, 2024
1 parent 5bb2264 commit 84ed73e
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
Expand Down Expand Up @@ -940,6 +942,8 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4484,4 +4484,4 @@ public Position getTheSlowestNonDurationReadPosition() {
}
return theSlowestNonDurableReadPosition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -70,4 +71,9 @@ public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() {
public long getNumberOfEntries(Range<PositionImpl> range) {
return this.ledger.getNumberOfEntries(range);
}

@Override
public boolean isMessageDeleted(Position position) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -65,13 +66,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -766,7 +769,7 @@ void testResetCursor() throws Exception {
@Test(timeOut = 20000)
void testResetCursor1() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("trc1");
PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Expand Down Expand Up @@ -2286,7 +2289,7 @@ void testFindNewestMatchingEdgeCase1() throws Exception {

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
assertNull(c1.findNewestMatching(
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -2595,7 +2598,7 @@ public void findEntryComplete(Position position, Object ctx) {

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
Object ctx) {
result.exception = exception;
counter.countDown();
}
Expand All @@ -2621,7 +2624,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
}

void internalTestFindNewestMatchingAllEntries(final String name, final int entriesPerLedger,
final int expectedEntryId) throws Exception {
final int expectedEntryId) throws Exception {
final String ledgerAndCursorName = name;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
Expand Down Expand Up @@ -2715,7 +2718,7 @@ void testReplayEntries() throws Exception {
assertTrue((Arrays.equals(entries.get(0).getData(), "entry1".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry3".getBytes(Encoding)))
|| (Arrays.equals(entries.get(0).getData(), "entry3".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
entries.forEach(Entry::release);

// 3. Fail on reading non-existing position
Expand Down Expand Up @@ -3142,7 +3145,7 @@ public void operationFailed(ManagedLedgerException exception) {

try {
bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()),
mlConfig.getPassword());
mlConfig.getPassword());
fail("ledger should have deleted due to update-cursor failure");
} catch (BKException e) {
// ok
Expand Down Expand Up @@ -3761,17 +3764,17 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch
pos.ackSet = bitSet.toLongArray();

cursor.asyncDelete(pos,
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();
pos.ackSet = null;
}
Expand Down Expand Up @@ -4484,5 +4487,202 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}


@Test
public void testReadEntriesWithSkipDeletedEntries() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries");
ledger = Mockito.spy(ledger);
List<Long> actualReadEntryIds = new ArrayList<>();
Mockito.doAnswer(inv -> {
long start = inv.getArgument(1);
long end = inv.getArgument(2);
for (long i = start; i <= end; i++) {
actualReadEntryIds.add(i);
}
return inv.callRealMethod();
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");

int entries = 20;
Position maxReadPosition = null;
Map<Integer, Position> map = new HashMap<>();
for (int i = 0; i < entries; i++) {
maxReadPosition = ledger.addEntry(new byte[1024]);
map.put(i, maxReadPosition);
}


Set<Position> deletedPositions = new HashSet<>();
deletedPositions.add(map.get(1));
deletedPositions.add(map.get(4));
deletedPositions.add(map.get(5));
deletedPositions.add(map.get(8));
deletedPositions.add(map.get(9));
deletedPositions.add(map.get(10));
deletedPositions.add(map.get(15));
deletedPositions.add(map.get(17));
deletedPositions.add(map.get(19));
cursor.delete(deletedPositions);

CompletableFuture<Void> f0 = new CompletableFuture<>();
List<Entry> readEntries = new ArrayList<>();
cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f0.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f0.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());

f0.get();

CompletableFuture<Void> f1 = new CompletableFuture<>();
cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f1.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f1.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());


f1.get();
CompletableFuture<Void> f2 = new CompletableFuture<>();
cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f2.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f2.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());

f2.get();

Position cursorReadPosition = cursor.getReadPosition();
Position expectReadPosition = maxReadPosition.getNext();
assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId()
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());

assertEquals(readEntries.size(), actualReadEntryIds.size());
assertEquals(entries - deletedPositions.size(), actualReadEntryIds.size());
for (Entry entry : readEntries) {
long entryId = entry.getEntryId();
assertTrue(actualReadEntryIds.contains(entryId));
}
}


@Test
public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
ledger = Mockito.spy(ledger);

List<Long> actualReadEntryIds = new ArrayList<>();
Mockito.doAnswer(inv -> {
long start = inv.getArgument(1);
long end = inv.getArgument(2);
for (long i = start; i <= end; i++) {
actualReadEntryIds.add(i);
}
return inv.callRealMethod();
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");

int entries = 20;
Position maxReadPosition0 = null;
Map<Integer, Position> map = new HashMap<>();
for (int i = 0; i < entries; i++) {
maxReadPosition0 = ledger.addEntry(new byte[1024]);
map.put(i, maxReadPosition0);
}

PositionImpl maxReadPosition =
PositionImpl.get(maxReadPosition0.getLedgerId(), maxReadPosition0.getEntryId()).getNext();

Set<Position> deletedPositions = new HashSet<>();
deletedPositions.add(map.get(1));
deletedPositions.add(map.get(3));
deletedPositions.add(map.get(5));
cursor.delete(deletedPositions);

Set<Long> skippedPositions = new HashSet<>();
skippedPositions.add(map.get(6).getEntryId());
skippedPositions.add(map.get(7).getEntryId());
skippedPositions.add(map.get(8).getEntryId());
skippedPositions.add(map.get(11).getEntryId());
skippedPositions.add(map.get(15).getEntryId());
skippedPositions.add(map.get(16).getEntryId());

Predicate<PositionImpl> skipCondition = position -> skippedPositions.contains(position.getEntryId());
List<Entry> readEntries = new ArrayList<>();

CompletableFuture<Void> f0 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f0.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f0.completeExceptionally(exception);
}
}, null, maxReadPosition, skipCondition);

f0.get();
CompletableFuture<Void> f1 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f1.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f1.completeExceptionally(exception);
}
}, null, maxReadPosition, skipCondition);
f1.get();


assertEquals(actualReadEntryIds.size(), readEntries.size());
assertEquals(entries - deletedPositions.size() - skippedPositions.size(), actualReadEntryIds.size());
for (Entry entry : readEntries) {
long entryId = entry.getEntryId();
assertTrue(actualReadEntryIds.contains(entryId));
}

Position cursorReadPosition = cursor.getReadPosition();
Position expectReadPosition = maxReadPosition;
assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId()
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}

0 comments on commit 84ed73e

Please sign in to comment.