diff --git a/versioned/persist/adapter/src/main/java/org/projectnessie/versioned/persist/adapter/spi/AbstractDatabaseAdapter.java b/versioned/persist/adapter/src/main/java/org/projectnessie/versioned/persist/adapter/spi/AbstractDatabaseAdapter.java index 6952919f6fe..d1b25b6b51c 100644 --- a/versioned/persist/adapter/src/main/java/org/projectnessie/versioned/persist/adapter/spi/AbstractDatabaseAdapter.java +++ b/versioned/persist/adapter/src/main/java/org/projectnessie/versioned/persist/adapter/spi/AbstractDatabaseAdapter.java @@ -26,6 +26,7 @@ import static org.projectnessie.versioned.persist.adapter.spi.DatabaseAdapterUtil.takeUntilIncludeLast; import static org.projectnessie.versioned.persist.adapter.spi.Traced.trace; +import com.google.common.base.Preconditions; import com.google.common.hash.Hasher; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; @@ -110,6 +111,8 @@ public abstract class AbstractDatabaseAdapter implements DatabaseAdapter { + private static final Function NO_IN_MEMORY_COMMITS = hash -> null; + protected static final String TAG_HASH = "hash"; protected static final String TAG_COUNT = "count"; protected final CONFIG config; @@ -197,7 +200,8 @@ protected CommitLogEntry commitAttempt( commitAttempt.getPuts(), commitAttempt.getDeletes(), currentBranchEntry != null ? currentBranchEntry.getKeyListDistance() : 0, - newKeyLists); + newKeyLists, + NO_IN_MEMORY_COMMITS); writeIndividualCommit(ctx, newBranchCommit); return newBranchCommit; } @@ -642,7 +646,7 @@ protected final CommitLogEntry fetchFromCommitLog(OP_CONTEXT ctx, Hash hash) { * must have exactly as many elements as in the parameter {@code hashes}. Non-existing hashes are * returned as {@code null}. */ - protected final List fetchPageFromCommitLog(OP_CONTEXT ctx, List hashes) { + private List fetchMultipleFromCommitLog(OP_CONTEXT ctx, List hashes) { if (hashes.isEmpty()) { return Collections.emptyList(); } @@ -650,30 +654,92 @@ protected final List fetchPageFromCommitLog(OP_CONTEXT ctx, List trace("fetchPageFromCommitLog") .tag(TAG_HASH, hashes.get(0).asString()) .tag(TAG_COUNT, hashes.size())) { - return doFetchPageFromCommitLog(ctx, hashes); + return doFetchMultipleFromCommitLog(ctx, hashes); + } + } + + private List fetchMultipleFromCommitLog( + OP_CONTEXT ctx, List hashes, @Nonnull Function inMemoryCommits) { + List result = new ArrayList<>(hashes.size()); + List remainingHashes = new ArrayList<>(hashes.size()); + List remainingIndexes = new ArrayList<>(hashes.size()); + + // Prefetch commits already available in memory. Record indexes for the missing commits to + // enable placing them in the correct positions later, when they are fetched from storage. + int idx = 0; + for (Hash hash : hashes) { + CommitLogEntry found = inMemoryCommits.apply(hash); + if (found != null) { + result.add(found); + } else { + result.add(null); // to be replaced with storage result below + remainingHashes.add(hash); + remainingIndexes.add(idx); + } + idx++; + } + + if (!remainingHashes.isEmpty()) { + List fromStorage = fetchMultipleFromCommitLog(ctx, remainingHashes); + // Fill the gaps in the final result list. Note that fetchPageFromCommitLog must return the + // list of the same size as its `remainingHashes` parameter. + idx = 0; + for (CommitLogEntry entry : fromStorage) { + int i = remainingIndexes.get(idx++); + result.set(i, entry); + } } + + return result; } - protected abstract List doFetchPageFromCommitLog( + protected abstract List doFetchMultipleFromCommitLog( OP_CONTEXT ctx, List hashes); /** Reads from the commit-log starting at the given commit-log-hash. */ protected Stream readCommitLogStream(OP_CONTEXT ctx, Hash initialHash) throws ReferenceNotFoundException { - Spliterator split = readCommitLog(ctx, initialHash); + Spliterator split = readCommitLog(ctx, initialHash, NO_IN_MEMORY_COMMITS); + return StreamSupport.stream(split, false); + } + + protected Stream readCommitLogStream( + OP_CONTEXT ctx, Hash initialHash, @Nonnull Function inMemoryCommits) + throws ReferenceNotFoundException { + Spliterator split = readCommitLog(ctx, initialHash, inMemoryCommits); return StreamSupport.stream(split, false); } - protected Spliterator readCommitLog(OP_CONTEXT ctx, Hash initialHash) + protected Spliterator readCommitLog( + OP_CONTEXT ctx, Hash initialHash, @Nonnull Function inMemoryCommits) throws ReferenceNotFoundException { + Preconditions.checkNotNull(inMemoryCommits, "in-memory commits cannot be null"); + if (NO_ANCESTOR.equals(initialHash)) { return Spliterators.emptySpliterator(); } - CommitLogEntry initial = fetchFromCommitLog(ctx, initialHash); + + CommitLogEntry initial = inMemoryCommits.apply(initialHash); + if (initial == null) { + initial = fetchFromCommitLog(ctx, initialHash); + } + if (initial == null) { throw referenceNotFound(initialHash); } - return logFetcher(ctx, initial, this::fetchPageFromCommitLog, CommitLogEntry::getParents); + + BiFunction, List> fetcher; + // Avoid creating unnecessary transient lists for the common case of fetching old commits from + // storage. + // Note: == comparison is fine in this situation because the "in memory" function is local to + // this class both for the empty and in the non-empty cases. + if (inMemoryCommits == NO_IN_MEMORY_COMMITS) { + fetcher = this::fetchMultipleFromCommitLog; + } else { + fetcher = (c, hashes) -> fetchMultipleFromCommitLog(c, hashes, inMemoryCommits); + } + + return logFetcher(ctx, initial, fetcher, CommitLogEntry::getParents); } /** @@ -775,7 +841,8 @@ protected CommitLogEntry buildIndividualCommit( List puts, List deletes, int currentKeyListDistance, - Consumer newKeyLists) + Consumer newKeyLists, + @Nonnull Function inMemoryCommits) throws ReferenceNotFoundException { Hash commitHash = individualCommitHash(parentHashes, commitMeta, puts, deletes); @@ -795,7 +862,7 @@ protected CommitLogEntry buildIndividualCommit( Collections.emptyList()); if (keyListDistance >= config.getKeyListDistance()) { - entry = buildKeyList(ctx, entry, newKeyLists); + entry = buildKeyList(ctx, entry, newKeyLists, inMemoryCommits); } return entry; } @@ -818,7 +885,7 @@ protected Hash individualCommitHash( return Hash.of(UnsafeByteOperations.unsafeWrap(hasher.hash().asBytes())); } - /** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer)}. */ + /** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer, Function)}. */ private static class KeyListBuildState { final ImmutableCommitLogEntry.Builder newCommitEntry; /** Builder for {@link CommitLogEntry#getKeyList()}. */ @@ -881,7 +948,10 @@ void addToEmbedded(KeyWithType keyWithType, int keyTypeSize) { * to both read and re-write those rows for {@link KeyListEntity}. */ protected CommitLogEntry buildKeyList( - OP_CONTEXT ctx, CommitLogEntry unwrittenEntry, Consumer newKeyLists) + OP_CONTEXT ctx, + CommitLogEntry unwrittenEntry, + Consumer newKeyLists, + @Nonnull Function inMemoryCommits) throws ReferenceNotFoundException { // Read commit-log until the previous persisted key-list @@ -894,7 +964,7 @@ protected CommitLogEntry buildKeyList( KeyListBuildState buildState = new KeyListBuildState(entitySize(unwrittenEntry), newCommitEntry); - keysForCommitEntry(ctx, startHash) + keysForCommitEntry(ctx, startHash, inMemoryCommits) .forEach( keyWithType -> { int keyTypeSize = entitySize(keyWithType); @@ -981,17 +1051,18 @@ protected void checkForModifiedKeysBetweenExpectedAndCurrentCommit( /** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */ protected Stream keysForCommitEntry( OP_CONTEXT ctx, Hash hash, KeyFilterPredicate keyFilter) throws ReferenceNotFoundException { - return keysForCommitEntry(ctx, hash) + return keysForCommitEntry(ctx, hash, NO_IN_MEMORY_COMMITS) .filter(kt -> keyFilter.check(kt.getKey(), kt.getContentId(), kt.getType())); } /** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */ - protected Stream keysForCommitEntry(OP_CONTEXT ctx, Hash hash) + protected Stream keysForCommitEntry( + OP_CONTEXT ctx, Hash hash, @Nonnull Function inMemoryCommits) throws ReferenceNotFoundException { // walk the commit-logs in reverse order - starting with the last persisted key-list Set seen = new HashSet<>(); - Stream log = readCommitLogStream(ctx, hash); + Stream log = readCommitLogStream(ctx, hash, inMemoryCommits); log = takeUntilIncludeLast(log, e -> e.getKeyList() != null); return log.flatMap( e -> { @@ -1352,6 +1423,8 @@ protected Hash copyCommits( int keyListDistance = targetHeadCommit != null ? targetHeadCommit.getKeyListDistance() : 0; + Map unwrittenCommits = new HashMap<>(); + // Rewrite commits to transplant and store those in 'commitsToTransplantReverse' for (int i = commitsChronological.size() - 1; i >= 0; i--, commitSeq++) { CommitLogEntry sourceCommit = commitsChronological.get(i); @@ -1377,9 +1450,12 @@ protected Hash copyCommits( sourceCommit.getPuts(), sourceCommit.getDeletes(), keyListDistance, - newKeyLists); + newKeyLists, + unwrittenCommits::get); keyListDistance = newEntry.getKeyListDistance(); + unwrittenCommits.put(newEntry.getHash(), newEntry); + if (!newEntry.getHash().equals(sourceCommit.getHash())) { commitsChronological.set(i, newEntry); } else { diff --git a/versioned/persist/dynamodb/src/main/java/org/projectnessie/versioned/persist/dynamodb/DynamoDatabaseAdapter.java b/versioned/persist/dynamodb/src/main/java/org/projectnessie/versioned/persist/dynamodb/DynamoDatabaseAdapter.java index 032bc81aece..75f608b4502 100644 --- a/versioned/persist/dynamodb/src/main/java/org/projectnessie/versioned/persist/dynamodb/DynamoDatabaseAdapter.java +++ b/versioned/persist/dynamodb/src/main/java/org/projectnessie/versioned/persist/dynamodb/DynamoDatabaseAdapter.java @@ -192,7 +192,7 @@ protected GlobalStateLogEntry doFetchFromGlobalLog( } @Override - protected List doFetchPageFromCommitLog( + protected List doFetchMultipleFromCommitLog( NonTransactionalOperationContext ctx, List hashes) { return fetchPageResult(TABLE_COMMIT_LOG, hashes, ProtoSerialization::protoToCommitLogEntry); } diff --git a/versioned/persist/inmem/src/main/java/org/projectnessie/versioned/persist/inmem/InmemoryDatabaseAdapter.java b/versioned/persist/inmem/src/main/java/org/projectnessie/versioned/persist/inmem/InmemoryDatabaseAdapter.java index 8373a16f1f7..d83c492d291 100644 --- a/versioned/persist/inmem/src/main/java/org/projectnessie/versioned/persist/inmem/InmemoryDatabaseAdapter.java +++ b/versioned/persist/inmem/src/main/java/org/projectnessie/versioned/persist/inmem/InmemoryDatabaseAdapter.java @@ -174,7 +174,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c } @Override - protected List doFetchPageFromCommitLog( + protected List doFetchMultipleFromCommitLog( NonTransactionalOperationContext ctx, List hashes) { return hashes.stream() .map(this::dbKey) diff --git a/versioned/persist/mongodb/src/main/java/org/projectnessie/versioned/persist/mongodb/MongoDatabaseAdapter.java b/versioned/persist/mongodb/src/main/java/org/projectnessie/versioned/persist/mongodb/MongoDatabaseAdapter.java index 275e0ea8424..d6d56ff3822 100644 --- a/versioned/persist/mongodb/src/main/java/org/projectnessie/versioned/persist/mongodb/MongoDatabaseAdapter.java +++ b/versioned/persist/mongodb/src/main/java/org/projectnessie/versioned/persist/mongodb/MongoDatabaseAdapter.java @@ -298,7 +298,7 @@ private static byte[] data(Document doc) { } @Override - protected List doFetchPageFromCommitLog( + protected List doFetchMultipleFromCommitLog( NonTransactionalOperationContext ctx, List hashes) { return fetchPage(client.getCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry); } diff --git a/versioned/persist/rocks/src/main/java/org/projectnessie/versioned/persist/rocks/RocksDatabaseAdapter.java b/versioned/persist/rocks/src/main/java/org/projectnessie/versioned/persist/rocks/RocksDatabaseAdapter.java index e26677cd8b6..3c43b7e8412 100644 --- a/versioned/persist/rocks/src/main/java/org/projectnessie/versioned/persist/rocks/RocksDatabaseAdapter.java +++ b/versioned/persist/rocks/src/main/java/org/projectnessie/versioned/persist/rocks/RocksDatabaseAdapter.java @@ -274,7 +274,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c } @Override - protected List doFetchPageFromCommitLog( + protected List doFetchMultipleFromCommitLog( NonTransactionalOperationContext ctx, List hashes) { return fetchPage( dbInstance.getCfCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry); diff --git a/versioned/persist/tests/src/main/java/org/projectnessie/versioned/persist/tests/AbstractMergeTransplant.java b/versioned/persist/tests/src/main/java/org/projectnessie/versioned/persist/tests/AbstractMergeTransplant.java index 61d979e6972..4dfd99882b1 100644 --- a/versioned/persist/tests/src/main/java/org/projectnessie/versioned/persist/tests/AbstractMergeTransplant.java +++ b/versioned/persist/tests/src/main/java/org/projectnessie/versioned/persist/tests/AbstractMergeTransplant.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.projectnessie.versioned.persist.adapter.DatabaseAdapterConfig.DEFAULT_KEY_LIST_DISTANCE; import com.google.protobuf.ByteString; import java.util.Arrays; @@ -27,7 +28,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Hash; import org.projectnessie.versioned.Key; @@ -47,8 +49,16 @@ protected AbstractMergeTransplant(DatabaseAdapter databaseAdapter) { this.databaseAdapter = databaseAdapter; } - @Test - void merge() throws Exception { + @ParameterizedTest + @ValueSource( + ints = { + 3, + 10, + DEFAULT_KEY_LIST_DISTANCE, + DEFAULT_KEY_LIST_DISTANCE + 1, + 100, + }) + void merge(int numCommits) throws Exception { AtomicInteger unifier = new AtomicInteger(); Function metadataUpdater = commitMeta -> @@ -57,6 +67,7 @@ void merge() throws Exception { Hash[] commits = mergeTransplant( + numCommits, (target, expectedHead, branch, commitHashes, i) -> databaseAdapter.merge(commitHashes[i], target, expectedHead, metadataUpdater)); @@ -74,8 +85,16 @@ void merge() throws Exception { .hasMessageStartingWith("No hashes to merge from '"); } - @Test - void transplant() throws Exception { + @ParameterizedTest + @ValueSource( + ints = { + 3, + 10, + DEFAULT_KEY_LIST_DISTANCE, + DEFAULT_KEY_LIST_DISTANCE + 1, + 100, + }) + void transplant(int numCommits) throws Exception { AtomicInteger unifier = new AtomicInteger(); Function metadataUpdater = commitMeta -> @@ -84,6 +103,7 @@ void transplant() throws Exception { Hash[] commits = mergeTransplant( + numCommits, (target, expectedHead, branch, commitHashes, i) -> databaseAdapter.transplant( target, @@ -148,14 +168,15 @@ void apply( throws Exception; } - private Hash[] mergeTransplant(MergeOrTransplant mergeOrTransplant) throws Exception { + private Hash[] mergeTransplant(int numCommits, MergeOrTransplant mergeOrTransplant) + throws Exception { BranchName main = BranchName.of("main"); BranchName branch = BranchName.of("branch"); BranchName conflict = BranchName.of("conflict"); databaseAdapter.create(branch, databaseAdapter.hashOnReference(main, Optional.empty())); - Hash[] commits = new Hash[3]; + Hash[] commits = new Hash[numCommits]; for (int i = 0; i < commits.length; i++) { ImmutableCommitAttempt.Builder commit = ImmutableCommitAttempt.builder() diff --git a/versioned/persist/tx/src/main/java/org/projectnessie/versioned/persist/tx/TxDatabaseAdapter.java b/versioned/persist/tx/src/main/java/org/projectnessie/versioned/persist/tx/TxDatabaseAdapter.java index 15fa1abe496..5313e810c83 100644 --- a/versioned/persist/tx/src/main/java/org/projectnessie/versioned/persist/tx/TxDatabaseAdapter.java +++ b/versioned/persist/tx/src/main/java/org/projectnessie/versioned/persist/tx/TxDatabaseAdapter.java @@ -1169,7 +1169,7 @@ protected CommitLogEntry doFetchFromCommitLog(Connection c, Hash hash) { } @Override - protected List doFetchPageFromCommitLog(Connection c, List hashes) { + protected List doFetchMultipleFromCommitLog(Connection c, List hashes) { String sql = sqlForManyPlaceholders(SqlStatements.SELECT_COMMIT_LOG_MANY, hashes.size()); try (PreparedStatement ps = c.prepareStatement(sql)) {