From 68c437450db7354fbe509fb8349ca9e9b58c332a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 17 Dec 2023 05:42:27 +0800 Subject: [PATCH 01/16] Filter out deleted entries before read entries from bookie. --- .../mledger/impl/ManagedCursorImpl.java | 22 +++ .../mledger/impl/ManagedLedgerImpl.java | 89 ++++++--- .../bookkeeper/mledger/impl/OpReadEntry.java | 3 +- .../mledger/impl/ManagedCursorTest.java | 186 ++++++++++++++++++ 4 files changed, 276 insertions(+), 24 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4b65d62f0eee8..65235fa7bc2d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -791,6 +791,28 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte ledger.asyncReadEntries(op); } + /** + * Is the position deleted. + * + * @param position entry position. + * @return + */ + protected boolean deleted(PositionImpl position) { + if (position.compareTo(this.markDeletePosition) <= 0) { + return true; + } + + this.lock.readLock().lock(); + try { + if (this.individualDeletedMessages.isEmpty()) { + return false; + } + return this.individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); + } finally { + this.lock.readLock().unlock(); + } + } + @Override public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 8ce2a6924ebed..b9310a5ce03c7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -35,6 +35,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,6 +47,7 @@ import java.util.Queue; import java.util.Random; import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -2090,35 +2092,25 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - // Filer out and skip unnecessary read entry - if (opReadEntry.skipCondition != null) { - long firstValidEntry = -1L; - long lastValidEntry = -1L; - long entryId = firstEntry; - for (; entryId <= lastEntry; entryId++) { - if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) { - if (firstValidEntry != -1L) { - break; - } - } else { - if (firstValidEntry == -1L) { - firstValidEntry = entryId; - } - - lastValidEntry = entryId; + // Do skip condition. + Predicate skipCondition = opReadEntry.skipCondition; + if (skipCondition != null) { + Set entryIds = new TreeSet<>(); + for (long i = firstEntry; i <= lastEntry; i++) { + if (!skipCondition.test(PositionImpl.get(ledger.getId(), i))) { + entryIds.add(i); } } - - // If all messages in [firstEntry...lastEntry] are filter out, - // then manual call internalReadEntriesComplete to advance read position. - if (firstValidEntry == -1L) { + if (entryIds.isEmpty()) { opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, PositionImpl.get(ledger.getId(), lastEntry)); return; } - - firstEntry = firstValidEntry; - lastEntry = lastValidEntry; + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries from ledger {} - entryIds {}", name, ledger.getId(), entryIds); + } + asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); + return; } if (log.isDebugEnabled()) { @@ -2128,6 +2120,57 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); } + // No batch-read in bookie, this method will not slower than `read(firstEntry, lastEntry)`. + @VisibleForTesting + public void asyncReadEntry(ReadHandle ledger, Set entryIds, OpReadEntry opReadEntry, + Object ctx) { + if (entryIds.isEmpty()) { + opReadEntry.readEntriesComplete(Collections.emptyList(), ctx); + return; + } + + // Maybe concurrent modification happens. + Set newSet = new TreeSet<>(entryIds); + AsyncCallbacks.ReadEntryCallback callback0 = new BatchReadEntryCallback(newSet, opReadEntry); + long ledgerId = ledger.getId(); + for (long entryId : entryIds) { + asyncReadEntry(ledger, PositionImpl.get(ledgerId, entryId), callback0, ctx); + } + } + + static class BatchReadEntryCallback implements AsyncCallbacks.ReadEntryCallback { + private final Set entryIds; + private final List entries; + private final AsyncCallbacks.ReadEntriesCallback callback; + + BatchReadEntryCallback(Set entryIds, AsyncCallbacks.ReadEntriesCallback callback) { + // Normally, there should be no competition here, but to prevent unforeseen circumstances, + // concurrent containers should still be used + this.entryIds = Collections.synchronizedSet(entryIds); + this.entries = Collections.synchronizedList(new ArrayList<>(entryIds.size())); + this.callback = callback; + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + long entryId = entry.getEntryId(); + this.entries.add(entry); + this.entryIds.remove(entryId); + if (this.entryIds.isEmpty()) { + // Need to sort entries, so that `readPosition` of `Cursor` can be moved correctly. + this.entries.sort(Comparator.comparingLong(Entry::getEntryId)); + this.callback.readEntriesComplete(this.entries, ctx); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + this.entryIds.clear(); + this.entries.clear(); + this.callback.readEntriesFailed(exception, ctx); + } + } + protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 7b59c3903d5bc..bd2453090e13b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -60,7 +60,8 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi maxPosition = PositionImpl.LATEST; } op.maxPosition = maxPosition; - op.skipCondition = skipCondition; + Predicate skipCondition0 = cursor::deleted; + op.skipCondition = skipCondition == null ? skipCondition0 : skipCondition.or(skipCondition0); op.ctx = ctx; op.nextReadPosition = PositionImpl.get(op.readPosition); return op; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 627ae73d928bd..75f959a62c4cf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -65,6 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Cleanup; @@ -72,6 +74,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -4484,5 +4487,188 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { ledger.close(); } + + @Test + public void testReadEntriesWithSkipDeletedEntries() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries"); + ledger = Mockito.spy(ledger); + List actualReadEntryIds = new ArrayList<>(); + Mockito.doAnswer(inv -> { + Set ids = inv.getArgument(1); + actualReadEntryIds.addAll(ids); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.any(Set.class), Mockito.any(), Mockito.any()); + @Cleanup + ManagedCursor cursor = ledger.openCursor("c"); + + int entries = 20; + Position maxReadPosition = null; + Map map = new HashMap<>(); + for (int i = 0; i < entries; i++) { + maxReadPosition = ledger.addEntry(new byte[1024]); + map.put(i, maxReadPosition); + } + + + Set deletedPositions = new HashSet<>(); + deletedPositions.add(map.get(1)); + deletedPositions.add(map.get(4)); + deletedPositions.add(map.get(5)); + deletedPositions.add(map.get(8)); + deletedPositions.add(map.get(9)); + deletedPositions.add(map.get(10)); + deletedPositions.add(map.get(15)); + deletedPositions.add(map.get(17)); + deletedPositions.add(map.get(19)); + cursor.delete(deletedPositions); + + CompletableFuture f0 = new CompletableFuture<>(); + List readEntries = new ArrayList<>(); + cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f0.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f0.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + f0.get(); + + CompletableFuture f1 = new CompletableFuture<>(); + cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f1.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f1.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + + f1.get(); + CompletableFuture f2 = new CompletableFuture<>(); + cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f2.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f2.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext()); + + f2.get(); + + Position cursorReadPosition = cursor.getReadPosition(); + Position expectReadPosition = maxReadPosition.getNext(); + assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId() + && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); + + assertEquals(readEntries.size(), actualReadEntryIds.size()); + assertEquals(entries - deletedPositions.size(), actualReadEntryIds.size()); + for (Entry entry : readEntries) { + long entryId = entry.getEntryId(); + assertTrue(actualReadEntryIds.contains(entryId)); + } + } + + + @Test + public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries"); + ledger = Mockito.spy(ledger); + + List actualReadEntryIds = new ArrayList<>(); + Mockito.doAnswer(inv -> { + Set ids = inv.getArgument(1); + actualReadEntryIds.addAll(ids); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.any(Set.class), Mockito.any(), Mockito.any()); + @Cleanup + ManagedCursor cursor = ledger.openCursor("c"); + + int entries = 20; + Position maxReadPosition0 = null; + Map map = new HashMap<>(); + for (int i = 0; i < entries; i++) { + maxReadPosition0 = ledger.addEntry(new byte[1024]); + map.put(i, maxReadPosition0); + } + + PositionImpl maxReadPosition = + PositionImpl.get(maxReadPosition0.getLedgerId(), maxReadPosition0.getEntryId()).getNext(); + + Set deletedPositions = new HashSet<>(); + deletedPositions.add(map.get(1)); + deletedPositions.add(map.get(3)); + deletedPositions.add(map.get(5)); + cursor.delete(deletedPositions); + + Set skippedPositions = new HashSet<>(); + skippedPositions.add(map.get(6).getEntryId()); + skippedPositions.add(map.get(7).getEntryId()); + skippedPositions.add(map.get(8).getEntryId()); + skippedPositions.add(map.get(11).getEntryId()); + skippedPositions.add(map.get(15).getEntryId()); + skippedPositions.add(map.get(16).getEntryId()); + + Predicate skipCondition = position -> skippedPositions.contains(position.getEntryId()); + List readEntries = new ArrayList<>(); + + CompletableFuture f0 = new CompletableFuture<>(); + cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f0.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f0.completeExceptionally(exception); + } + }, null, maxReadPosition, skipCondition); + + f0.get(); + CompletableFuture f1 = new CompletableFuture<>(); + cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + readEntries.addAll(entries); + f1.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + f1.completeExceptionally(exception); + } + }, null, maxReadPosition, skipCondition); + f1.get(); + + + assertEquals(actualReadEntryIds.size(), readEntries.size()); + assertEquals(entries - deletedPositions.size() - skippedPositions.size(), actualReadEntryIds.size()); + for (Entry entry : readEntries) { + long entryId = entry.getEntryId(); + assertTrue(actualReadEntryIds.contains(entryId)); + } + } + + + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } From 4d42fd5c4cf656a177c7626ab24098624f3c20d7 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 17 Dec 2023 05:45:12 +0800 Subject: [PATCH 02/16] Filter out deleted entries before read entries from bookie. --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 75f959a62c4cf..ff29090087572 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4666,6 +4666,11 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long entryId = entry.getEntryId(); assertTrue(actualReadEntryIds.contains(entryId)); } + + Position cursorReadPosition = cursor.getReadPosition(); + Position expectReadPosition = maxReadPosition; + assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId() + && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } From 81de6c1fc6bf703d0a5c6f28f6a95f8538d82638 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 17 Dec 2023 06:57:24 +0800 Subject: [PATCH 03/16] Filter out deleted entries before read entries from bookie. --- .../mledger/impl/ManagedLedgerImpl.java | 7 +- .../mledger/impl/ManagedCursorTest.java | 144 ++++++++++++------ 2 files changed, 107 insertions(+), 44 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b9310a5ce03c7..20c94732dadac 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2125,7 +2125,12 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) public void asyncReadEntry(ReadHandle ledger, Set entryIds, OpReadEntry opReadEntry, Object ctx) { if (entryIds.isEmpty()) { - opReadEntry.readEntriesComplete(Collections.emptyList(), ctx); + // If the entryIds is empty, should not move the `readPosition` of `cursor`. + // OpReadEntry#internalReadEntriesComplete will move the `readPosition` of `cursor` + // to the next position of `lastEntry`, so here uses the previous position of `readPosition` + // to offset the impact of OpReadEntry#internalReadEntriesComplete. + PositionImpl previous = this.getPreviousPosition(opReadEntry.readPosition); + opReadEntry.internalReadEntriesComplete(Collections.emptyList(), ctx, previous); return; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index ff29090087572..728463aba7fa8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -31,10 +31,12 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + import com.google.common.collect.Lists; import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; + import java.lang.reflect.Field; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -69,6 +71,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; + import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -123,7 +126,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { @DataProvider(name = "useOpenRangeSet") public static Object[][] useOpenRangeSet() { - return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; } @@ -142,7 +145,7 @@ public void testCloseCursor() throws Exception { ledger.addEntry(new byte[]{5}); // Persistent cursor info to ledger. c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId())); - Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0); + Awaitility.await().until(() -> c1.getStats().getPersistLedgerSucceed() > 0); // Make cursor ledger can not work. closeCursorLedger(c1); c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2)); @@ -302,7 +305,7 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { bkc.failNow(BKException.Code.NoBookieAvailableException); // Verify the cursor status will be persistent to ZK even if the cursor ledger creation always fails. // This time ZK will be written due to catch up. - Position lastEntry = positions.get(entryCount -1); + Position lastEntry = positions.get(entryCount - 1); cursor.markDelete(lastEntry); long persistZookeeperSucceed2 = cursor.getStats().getPersistZookeeperSucceed(); assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1); @@ -769,7 +772,7 @@ void testResetCursor() throws Exception { @Test(timeOut = 20000) void testResetCursor1() throws Exception { ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", - new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); ManagedCursor cursor = ledger.openCursor("trc1"); PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding)); ledger.addEntry("dummy-entry-2".getBytes(Encoding)); @@ -1601,12 +1604,17 @@ void testFilteringReadEntries() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3)); ManagedCursor cursor = ledger.openCursor("c1"); - /* Position p1 = */ledger.addEntry("entry1".getBytes()); - /* Position p2 = */ledger.addEntry("entry2".getBytes()); - /* Position p3 = */ledger.addEntry("entry3".getBytes()); - /* Position p4 = */ledger.addEntry("entry4".getBytes()); + /* Position p1 = */ + ledger.addEntry("entry1".getBytes()); + /* Position p2 = */ + ledger.addEntry("entry2".getBytes()); + /* Position p3 = */ + ledger.addEntry("entry3".getBytes()); + /* Position p4 = */ + ledger.addEntry("entry4".getBytes()); Position p5 = ledger.addEntry("entry5".getBytes()); - /* Position p6 = */ledger.addEntry("entry6".getBytes()); + /* Position p6 = */ + ledger.addEntry("entry6".getBytes()); assertEquals(cursor.getNumberOfEntries(), 6); assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6); @@ -1660,9 +1668,12 @@ void testCountingWithDeletedEntries() throws Exception { ManagedCursor cursor = ledger.openCursor("c1"); Position p1 = ledger.addEntry("entry1".getBytes()); - /* Position p2 = */ledger.addEntry("entry2".getBytes()); - /* Position p3 = */ledger.addEntry("entry3".getBytes()); - /* Position p4 = */ledger.addEntry("entry4".getBytes()); + /* Position p2 = */ + ledger.addEntry("entry2".getBytes()); + /* Position p3 = */ + ledger.addEntry("entry3".getBytes()); + /* Position p4 = */ + ledger.addEntry("entry4".getBytes()); Position p5 = ledger.addEntry("entry5".getBytes()); Position p6 = ledger.addEntry("entry6".getBytes()); Position p7 = ledger.addEntry("entry7".getBytes()); @@ -2017,19 +2028,19 @@ void testFindNewestMatching() throws Exception { @DataProvider(name = "testScanValues") public static Object[][] testScanValues() { - return new Object[][] { - { 10, 1 }, // single entry - { 10, 3 }, // batches with remainder - { 10, 5 }, // batches, half - { 10, 1000 }, // big batch size, scan whole ledger in one round - { 0, 10 } // empty ledger + return new Object[][]{ + {10, 1}, // single entry + {10, 3}, // batches with remainder + {10, 5}, // batches, half + {10, 1000}, // big batch size, scan whole ledger in one round + {0, 10} // empty ledger }; } @Test(dataProvider = "testScanValues", timeOut = 30000) void testScan(int numEntries, int batchSize) throws Exception { ManagedLedger ledger = factory.open("my_test_ledger_scan_" + numEntries - + "_" +batchSize); + + "_" + batchSize); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); List positions = new ArrayList<>(); @@ -2130,7 +2141,7 @@ void testScan(int numEntries, int batchSize) throws Exception { positionsFinal.add(entry.getPosition()); return true; }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get()); - assertEquals(0,positionsFinal.size()); + assertEquals(0, positionsFinal.size()); } @@ -2289,7 +2300,7 @@ void testFindNewestMatchingEdgeCase1() throws Exception { ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); assertNull(c1.findNewestMatching( - entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); + entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); } @Test(timeOut = 20000) @@ -2598,7 +2609,7 @@ public void findEntryComplete(Position position, Object ctx) { @Override public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, - Object ctx) { + Object ctx) { result.exception = exception; counter.countDown(); } @@ -2624,7 +2635,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional } void internalTestFindNewestMatchingAllEntries(final String name, final int entriesPerLedger, - final int expectedEntryId) throws Exception { + final int expectedEntryId) throws Exception { final String ledgerAndCursorName = name; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setRetentionSizeInMB(10); @@ -2718,7 +2729,7 @@ void testReplayEntries() throws Exception { assertTrue((Arrays.equals(entries.get(0).getData(), "entry1".getBytes(Encoding)) && Arrays.equals(entries.get(1).getData(), "entry3".getBytes(Encoding))) || (Arrays.equals(entries.get(0).getData(), "entry3".getBytes(Encoding)) - && Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding)))); + && Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding)))); entries.forEach(Entry::release); // 3. Fail on reading non-existing position @@ -2750,7 +2761,7 @@ void testGetLastIndividualDeletedRange() throws Exception { ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { ledger.addEntry(("entry" + i).getBytes(Encoding)); } PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1); @@ -2777,7 +2788,7 @@ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedExceptio ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { ledger.addEntry(("entry" + i).getBytes(Encoding)); } PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1); @@ -3097,9 +3108,10 @@ public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception { * Verifies that {@link ManagedCursorImpl#createNewMetadataLedger()} cleans up orphan ledgers if fails to switch new * ledger * + * * @throws Exception */ - @Test(timeOut=5000) + @Test(timeOut = 5000) public void testLeakFailedLedgerOfManageCursor() throws Exception { ManagedLedgerConfig mlConfig = new ManagedLedgerConfig(); @@ -3145,7 +3157,7 @@ public void operationFailed(ManagedLedgerException exception) { try { bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()), - mlConfig.getPassword()); + mlConfig.getPassword()); fail("ledger should have deleted due to update-cursor failure"); } catch (BKException e) { // ok @@ -3368,9 +3380,13 @@ public void testEstimatedUnackedSize() throws Exception { byte[] entryData = new byte[5]; // write 15 entries, saving position of 5th - for (int i = 0; i < 4; i++) { ledger.addEntry(entryData); } + for (int i = 0; i < 4; i++) { + ledger.addEntry(entryData); + } Position deleteAt = ledger.addEntry(entryData); - for (int i = 0; i < 10; i++) { ledger.addEntry(entryData); } + for (int i = 0; i < 10; i++) { + ledger.addEntry(entryData); + } assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 15 * entryData.length); @@ -3552,6 +3568,7 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(c.getReadPosition(), readPositionBeforeRecover); assertEquals(c.getNumberOfEntries(), 2L); } + @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive"); @@ -3764,17 +3781,17 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch pos.ackSet = bitSet.toLongArray(); cursor.asyncDelete(pos, - new DeleteCallback() { - @Override - public void deleteComplete(Object ctx) { - latch.countDown(); - } + new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } - @Override - public void deleteFailed(ManagedLedgerException exception, Object ctx) { - latch.countDown(); - } - }, null); + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); latch.await(); pos.ackSet = null; } @@ -4008,7 +4025,6 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } - @Test public void testFlushCursorAfterError() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -4081,7 +4097,7 @@ public void testConsistencyOfIndividualMessages() throws Exception { } assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0); - assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1)); + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); } @Test @@ -4589,7 +4605,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception { @Cleanup - ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries"); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions"); ledger = Mockito.spy(ledger); List actualReadEntryIds = new ArrayList<>(); @@ -4674,6 +4691,47 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } + @Test + public void testReadEmptyEntryIds() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions"); + @Cleanup + ManagedCursor cursor = ledger.openCursor("c"); + + int entries = 20; + Position maxPosition = PositionImpl.EARLIEST; + for (int i = 0; i < entries; i++) { + maxPosition = ledger.addEntry(new byte[1024]); + } + + CompletableFuture future = new CompletableFuture<>(); + Position cursorPosition = cursor.getReadPosition(); + ReadHandle handle = ledger.getLedgerHandle(maxPosition.getLedgerId()).get(); + OpReadEntry opReadEntry = OpReadEntry.create( + (ManagedCursorImpl) cursor, + PositionImpl.get(cursorPosition.getLedgerId(), cursorPosition.getEntryId()), + 0, // Set count = 0. + new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, PositionImpl.get(maxPosition.getLedgerId(), maxPosition.getEntryId()), null); + ledger.asyncReadEntry(handle, Collections.emptySet(), opReadEntry, null); + future.get(); + + // `readPosition` should not be moved. + Position newCursorReadPosition = cursor.getReadPosition(); + assertTrue(newCursorReadPosition.getLedgerId() == cursorPosition.getLedgerId() + && newCursorReadPosition.getEntryId() == cursorPosition.getEntryId()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } From c3e1ad4cd36f6b87ec7bf21d41cf46c1adbdc082 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 17 Dec 2023 22:25:24 +0800 Subject: [PATCH 04/16] Fix ManagedCursorTest code format --- .../mledger/impl/ManagedCursorTest.java | 69 +++++++------------ 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 728463aba7fa8..586006233e062 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -31,12 +31,10 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Lists; import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; - import java.lang.reflect.Field; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -71,7 +69,6 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; - import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -126,7 +123,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { @DataProvider(name = "useOpenRangeSet") public static Object[][] useOpenRangeSet() { - return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } @@ -145,7 +142,7 @@ public void testCloseCursor() throws Exception { ledger.addEntry(new byte[]{5}); // Persistent cursor info to ledger. c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId())); - Awaitility.await().until(() -> c1.getStats().getPersistLedgerSucceed() > 0); + Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0); // Make cursor ledger can not work. closeCursorLedger(c1); c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2)); @@ -305,7 +302,7 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { bkc.failNow(BKException.Code.NoBookieAvailableException); // Verify the cursor status will be persistent to ZK even if the cursor ledger creation always fails. // This time ZK will be written due to catch up. - Position lastEntry = positions.get(entryCount - 1); + Position lastEntry = positions.get(entryCount -1); cursor.markDelete(lastEntry); long persistZookeeperSucceed2 = cursor.getStats().getPersistZookeeperSucceed(); assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1); @@ -1604,17 +1601,12 @@ void testFilteringReadEntries() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3)); ManagedCursor cursor = ledger.openCursor("c1"); - /* Position p1 = */ - ledger.addEntry("entry1".getBytes()); - /* Position p2 = */ - ledger.addEntry("entry2".getBytes()); - /* Position p3 = */ - ledger.addEntry("entry3".getBytes()); - /* Position p4 = */ - ledger.addEntry("entry4".getBytes()); + /* Position p1 = */ledger.addEntry("entry1".getBytes()); + /* Position p2 = */ledger.addEntry("entry2".getBytes()); + /* Position p3 = */ledger.addEntry("entry3".getBytes()); + /* Position p4 = */ledger.addEntry("entry4".getBytes()); Position p5 = ledger.addEntry("entry5".getBytes()); - /* Position p6 = */ - ledger.addEntry("entry6".getBytes()); + /* Position p6 = */ledger.addEntry("entry6".getBytes()); assertEquals(cursor.getNumberOfEntries(), 6); assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6); @@ -1668,12 +1660,9 @@ void testCountingWithDeletedEntries() throws Exception { ManagedCursor cursor = ledger.openCursor("c1"); Position p1 = ledger.addEntry("entry1".getBytes()); - /* Position p2 = */ - ledger.addEntry("entry2".getBytes()); - /* Position p3 = */ - ledger.addEntry("entry3".getBytes()); - /* Position p4 = */ - ledger.addEntry("entry4".getBytes()); + /* Position p2 = */ledger.addEntry("entry2".getBytes()); + /* Position p3 = */ledger.addEntry("entry3".getBytes()); + /* Position p4 = */ledger.addEntry("entry4".getBytes()); Position p5 = ledger.addEntry("entry5".getBytes()); Position p6 = ledger.addEntry("entry6".getBytes()); Position p7 = ledger.addEntry("entry7".getBytes()); @@ -2028,19 +2017,19 @@ void testFindNewestMatching() throws Exception { @DataProvider(name = "testScanValues") public static Object[][] testScanValues() { - return new Object[][]{ - {10, 1}, // single entry - {10, 3}, // batches with remainder - {10, 5}, // batches, half - {10, 1000}, // big batch size, scan whole ledger in one round - {0, 10} // empty ledger + return new Object[][] { + { 10, 1 }, // single entry + { 10, 3 }, // batches with remainder + { 10, 5 }, // batches, half + { 10, 1000 }, // big batch size, scan whole ledger in one round + { 0, 10 } // empty ledger }; } @Test(dataProvider = "testScanValues", timeOut = 30000) void testScan(int numEntries, int batchSize) throws Exception { ManagedLedger ledger = factory.open("my_test_ledger_scan_" + numEntries - + "_" + batchSize); + + "_" +batchSize); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); List positions = new ArrayList<>(); @@ -2141,7 +2130,7 @@ void testScan(int numEntries, int batchSize) throws Exception { positionsFinal.add(entry.getPosition()); return true; }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get()); - assertEquals(0, positionsFinal.size()); + assertEquals(0,positionsFinal.size()); } @@ -2761,7 +2750,7 @@ void testGetLastIndividualDeletedRange() throws Exception { ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); - for (int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { ledger.addEntry(("entry" + i).getBytes(Encoding)); } PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1); @@ -2788,7 +2777,7 @@ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedExceptio ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); - for (int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { ledger.addEntry(("entry" + i).getBytes(Encoding)); } PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1); @@ -3108,10 +3097,9 @@ public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception { * Verifies that {@link ManagedCursorImpl#createNewMetadataLedger()} cleans up orphan ledgers if fails to switch new * ledger * - * * @throws Exception */ - @Test(timeOut = 5000) + @Test(timeOut=5000) public void testLeakFailedLedgerOfManageCursor() throws Exception { ManagedLedgerConfig mlConfig = new ManagedLedgerConfig(); @@ -3380,13 +3368,9 @@ public void testEstimatedUnackedSize() throws Exception { byte[] entryData = new byte[5]; // write 15 entries, saving position of 5th - for (int i = 0; i < 4; i++) { - ledger.addEntry(entryData); - } + for (int i = 0; i < 4; i++) { ledger.addEntry(entryData); } Position deleteAt = ledger.addEntry(entryData); - for (int i = 0; i < 10; i++) { - ledger.addEntry(entryData); - } + for (int i = 0; i < 10; i++) { ledger.addEntry(entryData); } assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 15 * entryData.length); @@ -3568,7 +3552,6 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(c.getReadPosition(), readPositionBeforeRecover); assertEquals(c.getNumberOfEntries(), 2L); } - @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive"); @@ -4025,6 +4008,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } + @Test public void testFlushCursorAfterError() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -4097,7 +4081,7 @@ public void testConsistencyOfIndividualMessages() throws Exception { } assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0); - assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1)); } @Test @@ -4732,6 +4716,5 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && newCursorReadPosition.getEntryId() == cursorPosition.getEntryId()); } - private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } From 2f4a17eccdf463ee96d1fbe10a7a4c3fb28ee0b6 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 17 Dec 2023 23:30:29 +0800 Subject: [PATCH 05/16] Fix ReadonlyCursor NPE --- .../mledger/impl/ManagedCursorImpl.java | 29 +++++-------------- .../bookkeeper/mledger/impl/OpReadEntry.java | 2 +- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 65235fa7bc2d8..dedfdd41199fb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -791,28 +791,6 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte ledger.asyncReadEntries(op); } - /** - * Is the position deleted. - * - * @param position entry position. - * @return - */ - protected boolean deleted(PositionImpl position) { - if (position.compareTo(this.markDeletePosition) <= 0) { - return true; - } - - this.lock.readLock().lock(); - try { - if (this.individualDeletedMessages.isEmpty()) { - return false; - } - return this.individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); - } finally { - this.lock.readLock().unlock(); - } - } - @Override public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException { @@ -3424,6 +3402,13 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { public boolean isMessageDeleted(Position position) { checkArgument(position instanceof PositionImpl); + // The `markDeletePosition` of `ReadonlyCursor` maybe null. + // if (this instanceof ReadOnlyCursor) { + // return false; + // } + if (markDeletePosition == null) { + return false; + } return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index bd2453090e13b..f74e56989ac8b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -60,7 +60,7 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi maxPosition = PositionImpl.LATEST; } op.maxPosition = maxPosition; - Predicate skipCondition0 = cursor::deleted; + Predicate skipCondition0 = cursor::isMessageDeleted; op.skipCondition = skipCondition == null ? skipCondition0 : skipCondition.or(skipCondition0); op.ctx = ctx; op.nextReadPosition = PositionImpl.get(op.readPosition); From d5a8362638645c95224125ba5e4ffdd30b039597 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 00:21:38 +0800 Subject: [PATCH 06/16] Don't filter out deleted entries if the `cursor` is `ReadonlyCursor` --- .../mledger/impl/ManagedCursorImpl.java | 7 ------- .../bookkeeper/mledger/impl/OpReadEntry.java | 15 +++++++++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index dedfdd41199fb..4b65d62f0eee8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3402,13 +3402,6 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { public boolean isMessageDeleted(Position position) { checkArgument(position instanceof PositionImpl); - // The `markDeletePosition` of `ReadonlyCursor` maybe null. - // if (this instanceof ReadOnlyCursor) { - // return false; - // } - if (markDeletePosition == null) { - return false; - } return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index f74e56989ac8b..bafda22a43053 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +61,18 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi maxPosition = PositionImpl.LATEST; } op.maxPosition = maxPosition; - Predicate skipCondition0 = cursor::isMessageDeleted; - op.skipCondition = skipCondition == null ? skipCondition0 : skipCondition.or(skipCondition0); + + Predicate skipCondition0 = cursor instanceof ReadOnlyCursor ? null : cursor::isMessageDeleted; + if (skipCondition == null) { + op.skipCondition = skipCondition0; + } else { + if (skipCondition0 == null) { + op.skipCondition = skipCondition; + } else { + op.skipCondition = skipCondition.or(skipCondition0); + } + } + op.ctx = ctx; op.nextReadPosition = PositionImpl.get(op.readPosition); return op; From 093a0c14d492ac1432a7cd8a410a034f6b75aca3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 05:37:21 +0800 Subject: [PATCH 07/16] Fix code. --- .../mledger/impl/ManagedLedgerImpl.java | 188 ++++++++++-------- .../mledger/impl/ManagedCursorTest.java | 108 +++++----- .../mledger/impl/ManagedLedgerTest.java | 2 +- 3 files changed, 156 insertions(+), 142 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 20c94732dadac..b41686f6bec28 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -23,32 +23,14 @@ import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.BoundType; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; +import com.google.common.collect.*; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.io.IOException; import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Objects; -import java.util.Optional; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -61,10 +43,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.*; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -2090,40 +2069,36 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) return; } - long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - - // Do skip condition. - Predicate skipCondition = opReadEntry.skipCondition; - if (skipCondition != null) { - Set entryIds = new TreeSet<>(); - for (long i = firstEntry; i <= lastEntry; i++) { - if (!skipCondition.test(PositionImpl.get(ledger.getId(), i))) { - entryIds.add(i); - } - } - if (entryIds.isEmpty()) { - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, - PositionImpl.get(ledger.getId(), lastEntry)); - return; - } + Predicate skipCond = opReadEntry.skipCondition; + if (skipCond == null) { + long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); if (log.isDebugEnabled()) { - log.debug("[{}] Reading entries from ledger {} - entryIds {}", name, ledger.getId(), entryIds); + log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, + lastEntry); } - asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); + asyncReadEntry( + ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx); return; } - if (log.isDebugEnabled()) { - log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, - lastEntry); + // Try to read entries in the current ledger what we need through a single `entryIdSet` as much as possible. + long entryId = firstEntry; + int count = 0; + SortedSet entryIds = new TreeSet<>(); + int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); + while (entryId <= lastEntryInLedger || count <= entriesToRead) { + PositionImpl position = PositionImpl.get(ledger.getId(), entryId); + if (!skipCond.test(position)) { + entryIds.add(entryId); + count++; + } + entryId++; } - asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); + asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); } - // No batch-read in bookie, this method will not slower than `read(firstEntry, lastEntry)`. - @VisibleForTesting - public void asyncReadEntry(ReadHandle ledger, Set entryIds, OpReadEntry opReadEntry, - Object ctx) { + + private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { if (entryIds.isEmpty()) { // If the entryIds is empty, should not move the `readPosition` of `cursor`. // OpReadEntry#internalReadEntriesComplete will move the `readPosition` of `cursor` @@ -2134,45 +2109,91 @@ public void asyncReadEntry(ReadHandle ledger, Set entryIds, OpReadEntry op return; } - // Maybe concurrent modification happens. - Set newSet = new TreeSet<>(entryIds); - AsyncCallbacks.ReadEntryCallback callback0 = new BatchReadEntryCallback(newSet, opReadEntry); - long ledgerId = ledger.getId(); - for (long entryId : entryIds) { - asyncReadEntry(ledger, PositionImpl.get(ledgerId, entryId), callback0, ctx); + Set> ranges = toRanges(entryIds); + ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); + for (Range range : ranges) { + long start = range.lowerEndpoint(); + long end = range.upperEndpoint(); + // TODO: should handle `lastReadCallback` timeout check??? + asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx); } } - static class BatchReadEntryCallback implements AsyncCallbacks.ReadEntryCallback { - private final Set entryIds; - private final List entries; - private final AsyncCallbacks.ReadEntriesCallback callback; + // Parse entryIds into ranges. + @VisibleForTesting + public static Set> toRanges(SortedSet entryIds) { + RangeSet set = TreeRangeSet.create(); + long start = entryIds.first(); + long end = start; + for (long entryId : entryIds) { + if (entryId - end > 1) { + set.add(Range.closed(start, end)); + start = entryId; + end = start; + } else { + end = entryId; + } + } + set.add(Range.closed(start, end)); + return set.asRanges(); + } - BatchReadEntryCallback(Set entryIds, AsyncCallbacks.ReadEntriesCallback callback) { - // Normally, there should be no competition here, but to prevent unforeseen circumstances, - // concurrent containers should still be used - this.entryIds = Collections.synchronizedSet(entryIds); - this.entries = Collections.synchronizedList(new ArrayList<>(entryIds.size())); + @VisibleForTesting + public static class BatchReadEntriesCallback implements ReadEntriesCallback { + private final Set entryIdSet; + private final SortedSet entrySet; + private final OpReadEntry callback; + private final AtomicBoolean failed = new AtomicBoolean(false); + + @VisibleForTesting + public BatchReadEntriesCallback(Set entryIdSet, OpReadEntry callback) { + this.entryIdSet = entryIdSet; + this.entrySet = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); this.callback = callback; } @Override - public void readEntryComplete(Entry entry, Object ctx) { - long entryId = entry.getEntryId(); - this.entries.add(entry); - this.entryIds.remove(entryId); - if (this.entryIds.isEmpty()) { - // Need to sort entries, so that `readPosition` of `Cursor` can be moved correctly. - this.entries.sort(Comparator.comparingLong(Entry::getEntryId)); - this.callback.readEntriesComplete(this.entries, ctx); + public synchronized void readEntriesComplete(List entries0, Object ctx) { + for (Entry entry : entries0) { + entrySet.add(entry); + if (entrySet.size() != entryIdSet.size()) { + return; + } + callback.readEntriesComplete(entrySet.stream().toList(), ctx); } } @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - this.entryIds.clear(); - this.entries.clear(); - this.callback.readEntriesFailed(exception, ctx); + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + // Should fail AT_MOST ONCE + if (!failed.compareAndSet(false, true)) { + return; + } + // If there are entries been read success, try to let the read operation success as possible. + List entries = filterEntries(); + if (entries.isEmpty()) { + callback.readEntriesFailed(exception, ctx); + } else { + callback.readEntriesComplete(entries, ctx); + } + } + + private List filterEntries() { + if (entrySet.isEmpty()) { + return Collections.emptyList(); + } + // Make sure the `readPosition` of `cursor` could be moved correctly. + List entries = new ArrayList<>(); + for (long entryId : entryIdSet) { + Entry entry = entrySet.first(); + if (entry.getEntryId() == entryId) { + entries.add(entry); + entrySet.remove(entry); + } else { + break; + } + } + return entries; } } @@ -2190,20 +2211,19 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr } } - protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, - Object ctx) { + @VisibleForTesting + public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries, + ReadEntriesCallback callback, Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); long createdTime = System.nanoTime(); ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, - opReadEntry, readOpCount, createdTime, ctx); + callback, readOpCount, createdTime, ctx); lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), - readCallback, readOpCount); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount); } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, - ctx); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 586006233e062..f2887a64392b4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -33,23 +33,15 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -4495,10 +4487,16 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { ledger = Mockito.spy(ledger); List actualReadEntryIds = new ArrayList<>(); Mockito.doAnswer(inv -> { - Set ids = inv.getArgument(1); - actualReadEntryIds.addAll(ids); - return inv.callRealMethod(); - }).when(ledger).asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.any(Set.class), Mockito.any(), Mockito.any()); + long start = inv.getArgument(1); + long end = inv.getArgument(2); + for (long i = start; i <= end; i++) { + actualReadEntryIds.add(i); + } + return inv.callRealMethod(); + }) + .when(ledger) + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyBoolean(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4595,10 +4593,16 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws List actualReadEntryIds = new ArrayList<>(); Mockito.doAnswer(inv -> { - Set ids = inv.getArgument(1); - actualReadEntryIds.addAll(ids); - return inv.callRealMethod(); - }).when(ledger).asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.any(Set.class), Mockito.any(), Mockito.any()); + long start = inv.getArgument(1); + long end = inv.getArgument(2); + for (long i = start; i <= end; i++) { + actualReadEntryIds.add(i); + } + return inv.callRealMethod(); + }) + .when(ledger) + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyBoolean(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4676,44 +4680,34 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test - public void testReadEmptyEntryIds() throws Exception { - @Cleanup - ManagedLedgerImpl ledger = (ManagedLedgerImpl) - factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions"); - @Cleanup - ManagedCursor cursor = ledger.openCursor("c"); - - int entries = 20; - Position maxPosition = PositionImpl.EARLIEST; - for (int i = 0; i < entries; i++) { - maxPosition = ledger.addEntry(new byte[1024]); - } - - CompletableFuture future = new CompletableFuture<>(); - Position cursorPosition = cursor.getReadPosition(); - ReadHandle handle = ledger.getLedgerHandle(maxPosition.getLedgerId()).get(); - OpReadEntry opReadEntry = OpReadEntry.create( - (ManagedCursorImpl) cursor, - PositionImpl.get(cursorPosition.getLedgerId(), cursorPosition.getEntryId()), - 0, // Set count = 0. - new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - future.complete(null); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null, PositionImpl.get(maxPosition.getLedgerId(), maxPosition.getEntryId()), null); - ledger.asyncReadEntry(handle, Collections.emptySet(), opReadEntry, null); - future.get(); - - // `readPosition` should not be moved. - Position newCursorReadPosition = cursor.getReadPosition(); - assertTrue(newCursorReadPosition.getLedgerId() == cursorPosition.getLedgerId() - && newCursorReadPosition.getEntryId() == cursorPosition.getEntryId()); + @SuppressWarnings("unchecked") + public void testRangeSet() { + SortedSet ids = new TreeSet<>(); + ids.add(1L); + ids.add(7L); + ids.add(8L); + ids.add(9L); + ids.add(15L); + ids.add(17L); + ids.add(19L); + ids.add(20L); + ids.add(21L); + + Set> ranges = ManagedLedgerImpl.toRanges(ids); + assertEquals(ranges.size(), 5); + Object[] rangeArr = ranges.toArray(); + + Range range0 = (Range) rangeArr[0]; + Range range1 = (Range) rangeArr[1]; + Range range2 = (Range) rangeArr[2]; + Range range3 = (Range) rangeArr[3]; + Range range4 = (Range) rangeArr[4]; + + assertTrue(range0.lowerEndpoint() == 1L && range0.upperEndpoint() == 1L); + assertTrue(range1.lowerEndpoint() == 7L && range1.upperEndpoint() == 9L); + assertTrue(range2.lowerEndpoint() == 15L && range2.upperEndpoint() == 15L); + assertTrue(range3.lowerEndpoint() == 17L && range3.upperEndpoint() == 17L); + assertTrue(range4.lowerEndpoint() == 19L && range4.upperEndpoint() == 21L); } private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 8430afb4e4f82..79bece552c6a8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3159,7 +3159,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { }, null, PositionImpl.LATEST, null); ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), - opReadEntry, ctxStr); + opReadEntry.cursor.isCacheReadEntry(), opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From 1566114423ee9bfa8b558456ea690745325074a0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 16:00:05 +0800 Subject: [PATCH 08/16] Fix imports --- .../mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++++-- .../mledger/impl/ManagedCursorTest.java | 2 -- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b41686f6bec28..06a2c618e0e15 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -23,14 +23,35 @@ import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.io.IOException; import java.time.Clock; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -43,7 +64,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index f2887a64392b4..d4ef048551d04 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -33,8 +33,6 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; From 27d2e0e75b0fbde9c657a1c43d05d64948a7b82a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 18:43:42 +0800 Subject: [PATCH 09/16] Fix code --- .../mledger/impl/ManagedLedgerImpl.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 06a2c618e0e15..7e427e2958ca8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2111,7 +2111,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) int count = 0; SortedSet entryIds = new TreeSet<>(); int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); - while (entryId <= lastEntryInLedger || count <= entriesToRead) { + while (entryId <= lastEntryInLedger && count <= entriesToRead) { PositionImpl position = PositionImpl.get(ledger.getId(), entryId); if (!skipCond.test(position)) { entryIds.add(entryId); @@ -2119,21 +2119,18 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) } entryId++; } + if (entryIds.isEmpty()) { + // Move `readPosition` of `cursor`. + PositionImpl position = PositionImpl.get(ledger.getId(), entryId - 1); + opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); + return; + } asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); } private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { - if (entryIds.isEmpty()) { - // If the entryIds is empty, should not move the `readPosition` of `cursor`. - // OpReadEntry#internalReadEntriesComplete will move the `readPosition` of `cursor` - // to the next position of `lastEntry`, so here uses the previous position of `readPosition` - // to offset the impact of OpReadEntry#internalReadEntriesComplete. - PositionImpl previous = this.getPreviousPosition(opReadEntry.readPosition); - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), ctx, previous); - return; - } - + checkArgument(!entryIds.isEmpty()); Set> ranges = toRanges(entryIds); ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); for (Range range : ranges) { @@ -2179,13 +2176,11 @@ public BatchReadEntriesCallback(Set entryIdSet, OpReadEntry callback) { @Override public synchronized void readEntriesComplete(List entries0, Object ctx) { - for (Entry entry : entries0) { - entrySet.add(entry); - if (entrySet.size() != entryIdSet.size()) { - return; - } - callback.readEntriesComplete(entrySet.stream().toList(), ctx); + entrySet.addAll(entries0); + if (entrySet.size() < entryIdSet.size()) { + return; } + callback.readEntriesComplete(entrySet.stream().toList(), ctx); } @Override From 850f8b5b4c43d3a2b24db33f4c05e59163dcdf3b Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 18:49:39 +0800 Subject: [PATCH 10/16] Fix imports --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d4ef048551d04..aaa7a383d88da 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -39,7 +39,19 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; From 5dd57fd638e67a1c1a91c0e36eced948c7a455dd Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Dec 2023 19:32:08 +0800 Subject: [PATCH 11/16] Fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7e427e2958ca8..6088e30bd1922 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2111,7 +2111,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) int count = 0; SortedSet entryIds = new TreeSet<>(); int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); - while (entryId <= lastEntryInLedger && count <= entriesToRead) { + while (entryId <= lastEntryInLedger && count < entriesToRead) { PositionImpl position = PositionImpl.get(ledger.getId(), entryId); if (!skipCond.test(position)) { entryIds.add(entryId); From 1e4a27e5d1496a5c4786008eb9fc04390d36e704 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 25 Dec 2023 17:07:46 +0800 Subject: [PATCH 12/16] Review fix --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 8 ++++++++ .../bookkeeper/mledger/impl/OpReadEntry.java | 14 +------------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4b65d62f0eee8..6e51bb92d1130 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -84,6 +84,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.ScanOutcome; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; @@ -786,6 +787,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes); PENDING_READ_OPS_UPDATER.incrementAndGet(this); + // Skip deleted entries. + skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition); ledger.asyncReadEntries(op); @@ -944,6 +947,8 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition); } else { + // Skip deleted entries. + skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx, maxPosition, skipCondition); @@ -3401,6 +3406,9 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { } public boolean isMessageDeleted(Position position) { + if (this instanceof ReadOnlyCursor) { + return false; + } checkArgument(position instanceof PositionImpl); return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index bafda22a43053..7b59c3903d5bc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -30,7 +30,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,18 +60,7 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi maxPosition = PositionImpl.LATEST; } op.maxPosition = maxPosition; - - Predicate skipCondition0 = cursor instanceof ReadOnlyCursor ? null : cursor::isMessageDeleted; - if (skipCondition == null) { - op.skipCondition = skipCondition0; - } else { - if (skipCondition0 == null) { - op.skipCondition = skipCondition; - } else { - op.skipCondition = skipCondition.or(skipCondition0); - } - } - + op.skipCondition = skipCondition; op.ctx = ctx; op.nextReadPosition = PositionImpl.get(op.readPosition); return op; From e1af879ed2031f13f502df9e39e1d4449b034c04 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 4 Jan 2024 21:47:38 +0800 Subject: [PATCH 13/16] Rollback ManagedLedgerImpl changes. --- .../mledger/impl/ManagedLedgerImpl.java | 162 ++++-------------- .../mledger/impl/ManagedCursorTest.java | 40 +---- 2 files changed, 39 insertions(+), 163 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6088e30bd1922..66bd414707480 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -27,8 +27,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; @@ -37,7 +35,6 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -49,8 +46,6 @@ import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -64,7 +59,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -1271,7 +1265,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { @Override public String toString() { - return String.format("ML [{}] get earliest message publish time of pos", + return String.format("ML [%s] get earliest message publish time of pos", ManagedLedgerImpl.this.name); } }, null); @@ -2094,127 +2088,44 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) return; } - Predicate skipCond = opReadEntry.skipCondition; - if (skipCond == null) { - long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - if (log.isDebugEnabled()) { - log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, - lastEntry); - } - asyncReadEntry( - ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx); - return; - } - - // Try to read entries in the current ledger what we need through a single `entryIdSet` as much as possible. - long entryId = firstEntry; - int count = 0; - SortedSet entryIds = new TreeSet<>(); - int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); - while (entryId <= lastEntryInLedger && count < entriesToRead) { - PositionImpl position = PositionImpl.get(ledger.getId(), entryId); - if (!skipCond.test(position)) { - entryIds.add(entryId); - count++; - } - entryId++; - } - if (entryIds.isEmpty()) { - // Move `readPosition` of `cursor`. - PositionImpl position = PositionImpl.get(ledger.getId(), entryId - 1); - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); - return; - } - asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); - } - + long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { - checkArgument(!entryIds.isEmpty()); - Set> ranges = toRanges(entryIds); - ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); - for (Range range : ranges) { - long start = range.lowerEndpoint(); - long end = range.upperEndpoint(); - // TODO: should handle `lastReadCallback` timeout check??? - asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx); - } - } + // Filer out and skip unnecessary read entry + if (opReadEntry.skipCondition != null) { + long firstValidEntry = -1L; + long lastValidEntry = -1L; + long entryId = firstEntry; + for (; entryId <= lastEntry; entryId++) { + if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) { + if (firstValidEntry != -1L) { + break; + } + } else { + if (firstValidEntry == -1L) { + firstValidEntry = entryId; + } - // Parse entryIds into ranges. - @VisibleForTesting - public static Set> toRanges(SortedSet entryIds) { - RangeSet set = TreeRangeSet.create(); - long start = entryIds.first(); - long end = start; - for (long entryId : entryIds) { - if (entryId - end > 1) { - set.add(Range.closed(start, end)); - start = entryId; - end = start; - } else { - end = entryId; + lastValidEntry = entryId; + } } - } - set.add(Range.closed(start, end)); - return set.asRanges(); - } - - @VisibleForTesting - public static class BatchReadEntriesCallback implements ReadEntriesCallback { - private final Set entryIdSet; - private final SortedSet entrySet; - private final OpReadEntry callback; - private final AtomicBoolean failed = new AtomicBoolean(false); - - @VisibleForTesting - public BatchReadEntriesCallback(Set entryIdSet, OpReadEntry callback) { - this.entryIdSet = entryIdSet; - this.entrySet = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); - this.callback = callback; - } - @Override - public synchronized void readEntriesComplete(List entries0, Object ctx) { - entrySet.addAll(entries0); - if (entrySet.size() < entryIdSet.size()) { + // If all messages in [firstEntry...lastEntry] are filter out, + // then manual call internalReadEntriesComplete to advance read position. + if (firstValidEntry == -1L) { + opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, + PositionImpl.get(ledger.getId(), lastEntry)); return; } - callback.readEntriesComplete(entrySet.stream().toList(), ctx); - } - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - // Should fail AT_MOST ONCE - if (!failed.compareAndSet(false, true)) { - return; - } - // If there are entries been read success, try to let the read operation success as possible. - List entries = filterEntries(); - if (entries.isEmpty()) { - callback.readEntriesFailed(exception, ctx); - } else { - callback.readEntriesComplete(entries, ctx); - } + firstEntry = firstValidEntry; + lastEntry = lastValidEntry; } - private List filterEntries() { - if (entrySet.isEmpty()) { - return Collections.emptyList(); - } - // Make sure the `readPosition` of `cursor` could be moved correctly. - List entries = new ArrayList<>(); - for (long entryId : entryIdSet) { - Entry entry = entrySet.first(); - if (entry.getEntryId() == entryId) { - entries.add(entry); - entrySet.remove(entry); - } else { - break; - } - } - return entries; + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, + lastEntry); } + asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); } protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) { @@ -2231,19 +2142,20 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr } } - @VisibleForTesting - public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries, - ReadEntriesCallback callback, Object ctx) { + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, + Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); long createdTime = System.nanoTime(); ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, - callback, readOpCount, createdTime, ctx); + opReadEntry, readOpCount, createdTime, ctx); lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), + readCallback, readOpCount); } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, + ctx); } } @@ -4625,4 +4537,4 @@ public Position getTheSlowestNonDurationReadPosition() { } return theSlowestNonDurableReadPosition; } -} +} \ No newline at end of file diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index aaa7a383d88da..644f53c3a522d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -49,8 +49,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -4505,8 +4503,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4611,8 +4608,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4688,37 +4684,5 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } - - @Test - @SuppressWarnings("unchecked") - public void testRangeSet() { - SortedSet ids = new TreeSet<>(); - ids.add(1L); - ids.add(7L); - ids.add(8L); - ids.add(9L); - ids.add(15L); - ids.add(17L); - ids.add(19L); - ids.add(20L); - ids.add(21L); - - Set> ranges = ManagedLedgerImpl.toRanges(ids); - assertEquals(ranges.size(), 5); - Object[] rangeArr = ranges.toArray(); - - Range range0 = (Range) rangeArr[0]; - Range range1 = (Range) rangeArr[1]; - Range range2 = (Range) rangeArr[2]; - Range range3 = (Range) rangeArr[3]; - Range range4 = (Range) rangeArr[4]; - - assertTrue(range0.lowerEndpoint() == 1L && range0.upperEndpoint() == 1L); - assertTrue(range1.lowerEndpoint() == 7L && range1.upperEndpoint() == 9L); - assertTrue(range2.lowerEndpoint() == 15L && range2.upperEndpoint() == 15L); - assertTrue(range3.lowerEndpoint() == 17L && range3.upperEndpoint() == 17L); - assertTrue(range4.lowerEndpoint() == 19L && range4.upperEndpoint() == 21L); - } - private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } From b6c91ed05fbee78d653c619911912f7e550ae1b6 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 4 Jan 2024 21:50:35 +0800 Subject: [PATCH 14/16] Rollback ManagedLedgerImpl changes. --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 66bd414707480..89f5c716a4cf0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1265,7 +1265,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { @Override public String toString() { - return String.format("ML [%s] get earliest message publish time of pos", + return String.format("ML [{}] get earliest message publish time of pos", ManagedLedgerImpl.this.name); } }, null); From 30a4a1f817a2b3b4c32ec9d8d72061a8dbef3b37 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 4 Jan 2024 21:52:24 +0800 Subject: [PATCH 15/16] Rollback ManagedLedgerImpl changes. --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 79bece552c6a8..8430afb4e4f82 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3159,7 +3159,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { }, null, PositionImpl.LATEST, null); ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), - opReadEntry.cursor.isCacheReadEntry(), opReadEntry, ctxStr); + opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From 276358b6848f6687c5a42e1b9242e945e2f274ef Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 18 Jan 2024 17:59:42 +0800 Subject: [PATCH 16/16] address comment. --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 ---- .../apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java | 6 ++++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2f43e0ec97768..38b142aca372e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -84,7 +84,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.ScanOutcome; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; @@ -3411,9 +3410,6 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { } public boolean isMessageDeleted(Position position) { - if (this instanceof ReadOnlyCursor) { - return false; - } checkArgument(position instanceof PositionImpl); return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java index 9102339b2904e..1661613f07d7d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -70,4 +71,9 @@ public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() { public long getNumberOfEntries(Range range) { return this.ledger.getNumberOfEntries(range); } + + @Override + public boolean isMessageDeleted(Position position) { + return false; + } }