diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 060138f491e2f..38b142aca372e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -791,6 +791,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes); PENDING_READ_OPS_UPDATER.incrementAndGet(this); + // Skip deleted entries. + skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition); ledger.asyncReadEntries(op); @@ -949,6 +951,8 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition); } else { + // Skip deleted entries. + skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx, maxPosition, skipCondition); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 569776edccf47..c839ee6f77c6f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4539,4 +4539,4 @@ public Position getTheSlowestNonDurationReadPosition() { } return theSlowestNonDurableReadPosition; } -} +} \ No newline at end of file diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java index 9102339b2904e..1661613f07d7d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -70,4 +71,9 @@ public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() { public long getNumberOfEntries(Range range) { return this.ledger.getNumberOfEntries(range); } + + @Override + public boolean isMessageDeleted(Position position) { + return false; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 627ae73d928bd..644f53c3a522d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -65,6 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Cleanup; @@ -72,6 +74,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -766,7 +769,7 @@ void testResetCursor() throws Exception { @Test(timeOut = 20000) void testResetCursor1() throws Exception { ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", - new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); ManagedCursor cursor = ledger.openCursor("trc1"); PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding)); ledger.addEntry("dummy-entry-2".getBytes(Encoding)); @@ -2286,7 +2289,7 @@ void testFindNewestMatchingEdgeCase1() throws Exception { ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); assertNull(c1.findNewestMatching( - entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); + entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); } @Test(timeOut = 20000) @@ -2595,7 +2598,7 @@ public void findEntryComplete(Position position, Object ctx) { @Override public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, - Object ctx) { + Object ctx) { result.exception = exception; counter.countDown(); } @@ -2621,7 +2624,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional } void internalTestFindNewestMatchingAllEntries(final String name, final int entriesPerLedger, - final int expectedEntryId) throws Exception { + final int expectedEntryId) throws Exception { final String ledgerAndCursorName = name; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setRetentionSizeInMB(10); @@ -2715,7 +2718,7 @@ void testReplayEntries() throws Exception { assertTrue((Arrays.equals(entries.get(0).getData(), "entry1".getBytes(Encoding)) && Arrays.equals(entries.get(1).getData(), "entry3".getBytes(Encoding))) || (Arrays.equals(entries.get(0).getData(), "entry3".getBytes(Encoding)) - && Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding)))); + && Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding)))); entries.forEach(Entry::release); // 3. Fail on reading non-existing position @@ -3142,7 +3145,7 @@ public void operationFailed(ManagedLedgerException exception) { try { bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()), - mlConfig.getPassword()); + mlConfig.getPassword()); fail("ledger should have deleted due to update-cursor failure"); } catch (BKException e) { // ok @@ -3761,17 +3764,17 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch pos.ackSet = bitSet.toLongArray(); cursor.asyncDelete(pos, - new DeleteCallback() { - @Override - public void deleteComplete(Object ctx) { - latch.countDown(); - } + new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } - @Override - public void deleteFailed(ManagedLedgerException exception, Object ctx) { - latch.countDown(); - } - }, null); + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); latch.await(); pos.ackSet = null; } @@ -4484,5 +4487,202 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { ledger.close(); } + + @Test + public void testReadEntriesWithSkipDeletedEntries() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries"); + ledger = Mockito.spy(ledger); + List actualReadEntryIds = new ArrayList<>(); + Mockito.doAnswer(inv -> { + long start = inv.getArgument(1); + long end = inv.getArgument(2); + for (long i = start; i <= end; i++) { + actualReadEntryIds.add(i); + } + return inv.callRealMethod(); + }) + .when(ledger) + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + @Cleanup + ManagedCursor cursor = ledger.openCursor("c"); + + int entries = 20; + Position maxReadPosition = null; + Map map = new HashMap<>(); + for (int i = 0; i < entries; i++) { + maxReadPosition = ledger.addEntry(new byte[1024]); + map.put(i, maxReadPosition); + } + + + Set deletedPositions = new HashSet<>(); + deletedPositions.add(map.get(1)); + deletedPositions.add(map.get(4)); + deletedPositions.add(map.get(5)); + deletedPositions.add(map.get(8)); + deletedPositions.add(map.get(9)); + deletedPositions.add(map.get(10)); + deletedPositions.add(map.get(15)); + deletedPositions.add(map.get(17)); + deletedPositions.add(map.get(19)); + cursor.delete(deletedPositions); + + CompletableFuture f0 = new CompletableFuture<>(); + List readEntries = new ArrayList<>(); + cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f0.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f0.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + f0.get(); + + CompletableFuture f1 = new CompletableFuture<>(); + cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f1.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f1.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + + f1.get(); + CompletableFuture f2 = new CompletableFuture<>(); + cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f2.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f2.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + f2.get(); + + Position cursorReadPosition = cursor.getReadPosition(); + Position expectReadPosition = maxReadPosition.getNext(); + assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId() + && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); + + assertEquals(readEntries.size(), actualReadEntryIds.size()); + assertEquals(entries - deletedPositions.size(), actualReadEntryIds.size()); + for (Entry entry : readEntries) { + long entryId = entry.getEntryId(); + assertTrue(actualReadEntryIds.contains(entryId)); + } + } + + + @Test + public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions"); + ledger = Mockito.spy(ledger); + + List actualReadEntryIds = new ArrayList<>(); + Mockito.doAnswer(inv -> { + long start = inv.getArgument(1); + long end = inv.getArgument(2); + for (long i = start; i <= end; i++) { + actualReadEntryIds.add(i); + } + return inv.callRealMethod(); + }) + .when(ledger) + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + @Cleanup + ManagedCursor cursor = ledger.openCursor("c"); + + int entries = 20; + Position maxReadPosition0 = null; + Map map = new HashMap<>(); + for (int i = 0; i < entries; i++) { + maxReadPosition0 = ledger.addEntry(new byte[1024]); + map.put(i, maxReadPosition0); + } + + PositionImpl maxReadPosition = + PositionImpl.get(maxReadPosition0.getLedgerId(), maxReadPosition0.getEntryId()).getNext(); + + Set deletedPositions = new HashSet<>(); + deletedPositions.add(map.get(1)); + deletedPositions.add(map.get(3)); + deletedPositions.add(map.get(5)); + cursor.delete(deletedPositions); + + Set skippedPositions = new HashSet<>(); + skippedPositions.add(map.get(6).getEntryId()); + skippedPositions.add(map.get(7).getEntryId()); + skippedPositions.add(map.get(8).getEntryId()); + skippedPositions.add(map.get(11).getEntryId()); + skippedPositions.add(map.get(15).getEntryId()); + skippedPositions.add(map.get(16).getEntryId()); + + Predicate skipCondition = position -> skippedPositions.contains(position.getEntryId()); + List readEntries = new ArrayList<>(); + + CompletableFuture f0 = new CompletableFuture<>(); + cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f0.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f0.completeExceptionally(exception); + } + }, null, maxReadPosition, skipCondition); + + f0.get(); + CompletableFuture f1 = new CompletableFuture<>(); + cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f1.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f1.completeExceptionally(exception); + } + }, null, maxReadPosition, skipCondition); + f1.get(); + + + assertEquals(actualReadEntryIds.size(), readEntries.size()); + assertEquals(entries - deletedPositions.size() - skippedPositions.size(), actualReadEntryIds.size()); + for (Entry entry : readEntries) { + long entryId = entry.getEntryId(); + assertTrue(actualReadEntryIds.contains(entryId)); + } + + Position cursorReadPosition = cursor.getReadPosition(); + Position expectReadPosition = maxReadPosition; + assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId() + && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }