Skip to content

Commit

Permalink
Use unsaved in-memory commits during key list re-construction (#3486)
Browse files Browse the repository at this point in the history
* Use unsaved in-memory commits during key list re-construction

* Allow the unsaved in-memory commits built during merge / transplant
  operations to be found during key list re-construction.

* Skip the in-memory lookup in the common case of non-merge call
  paths to reduce the amount of transient lists (parameters and
  results).

* Parameterize merge / transplant test cases for full coverage
  of related code.

Fixes #3466

* Rename fetchPageFromCommitLog -> fetchMultipleFromCommitLog
  • Loading branch information
dimas-b authored Mar 1, 2022
1 parent 9f0a360 commit b57ac21
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.projectnessie.versioned.persist.adapter.spi.DatabaseAdapterUtil.takeUntilIncludeLast;
import static org.projectnessie.versioned.persist.adapter.spi.Traced.trace;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
Expand Down Expand Up @@ -110,6 +111,8 @@
public abstract class AbstractDatabaseAdapter<OP_CONTEXT, CONFIG extends DatabaseAdapterConfig>
implements DatabaseAdapter {

private static final Function<Hash, CommitLogEntry> NO_IN_MEMORY_COMMITS = hash -> null;

protected static final String TAG_HASH = "hash";
protected static final String TAG_COUNT = "count";
protected final CONFIG config;
Expand Down Expand Up @@ -197,7 +200,8 @@ protected CommitLogEntry commitAttempt(
commitAttempt.getPuts(),
commitAttempt.getDeletes(),
currentBranchEntry != null ? currentBranchEntry.getKeyListDistance() : 0,
newKeyLists);
newKeyLists,
NO_IN_MEMORY_COMMITS);
writeIndividualCommit(ctx, newBranchCommit);
return newBranchCommit;
}
Expand Down Expand Up @@ -642,38 +646,100 @@ protected final CommitLogEntry fetchFromCommitLog(OP_CONTEXT ctx, Hash hash) {
* must have exactly as many elements as in the parameter {@code hashes}. Non-existing hashes are
* returned as {@code null}.
*/
protected final List<CommitLogEntry> fetchPageFromCommitLog(OP_CONTEXT ctx, List<Hash> hashes) {
private List<CommitLogEntry> fetchMultipleFromCommitLog(OP_CONTEXT ctx, List<Hash> hashes) {
if (hashes.isEmpty()) {
return Collections.emptyList();
}
try (Traced ignore =
trace("fetchPageFromCommitLog")
.tag(TAG_HASH, hashes.get(0).asString())
.tag(TAG_COUNT, hashes.size())) {
return doFetchPageFromCommitLog(ctx, hashes);
return doFetchMultipleFromCommitLog(ctx, hashes);
}
}

private List<CommitLogEntry> fetchMultipleFromCommitLog(
OP_CONTEXT ctx, List<Hash> hashes, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits) {
List<CommitLogEntry> result = new ArrayList<>(hashes.size());
List<Hash> remainingHashes = new ArrayList<>(hashes.size());
List<Integer> remainingIndexes = new ArrayList<>(hashes.size());

// Prefetch commits already available in memory. Record indexes for the missing commits to
// enable placing them in the correct positions later, when they are fetched from storage.
int idx = 0;
for (Hash hash : hashes) {
CommitLogEntry found = inMemoryCommits.apply(hash);
if (found != null) {
result.add(found);
} else {
result.add(null); // to be replaced with storage result below
remainingHashes.add(hash);
remainingIndexes.add(idx);
}
idx++;
}

if (!remainingHashes.isEmpty()) {
List<CommitLogEntry> fromStorage = fetchMultipleFromCommitLog(ctx, remainingHashes);
// Fill the gaps in the final result list. Note that fetchPageFromCommitLog must return the
// list of the same size as its `remainingHashes` parameter.
idx = 0;
for (CommitLogEntry entry : fromStorage) {
int i = remainingIndexes.get(idx++);
result.set(i, entry);
}
}

return result;
}

protected abstract List<CommitLogEntry> doFetchPageFromCommitLog(
protected abstract List<CommitLogEntry> doFetchMultipleFromCommitLog(
OP_CONTEXT ctx, List<Hash> hashes);

/** Reads from the commit-log starting at the given commit-log-hash. */
protected Stream<CommitLogEntry> readCommitLogStream(OP_CONTEXT ctx, Hash initialHash)
throws ReferenceNotFoundException {
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash);
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash, NO_IN_MEMORY_COMMITS);
return StreamSupport.stream(split, false);
}

protected Stream<CommitLogEntry> readCommitLogStream(
OP_CONTEXT ctx, Hash initialHash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash, inMemoryCommits);
return StreamSupport.stream(split, false);
}

protected Spliterator<CommitLogEntry> readCommitLog(OP_CONTEXT ctx, Hash initialHash)
protected Spliterator<CommitLogEntry> readCommitLog(
OP_CONTEXT ctx, Hash initialHash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Preconditions.checkNotNull(inMemoryCommits, "in-memory commits cannot be null");

if (NO_ANCESTOR.equals(initialHash)) {
return Spliterators.emptySpliterator();
}
CommitLogEntry initial = fetchFromCommitLog(ctx, initialHash);

CommitLogEntry initial = inMemoryCommits.apply(initialHash);
if (initial == null) {
initial = fetchFromCommitLog(ctx, initialHash);
}

if (initial == null) {
throw referenceNotFound(initialHash);
}
return logFetcher(ctx, initial, this::fetchPageFromCommitLog, CommitLogEntry::getParents);

BiFunction<OP_CONTEXT, List<Hash>, List<CommitLogEntry>> fetcher;
// Avoid creating unnecessary transient lists for the common case of fetching old commits from
// storage.
// Note: == comparison is fine in this situation because the "in memory" function is local to
// this class both for the empty and in the non-empty cases.
if (inMemoryCommits == NO_IN_MEMORY_COMMITS) {
fetcher = this::fetchMultipleFromCommitLog;
} else {
fetcher = (c, hashes) -> fetchMultipleFromCommitLog(c, hashes, inMemoryCommits);
}

return logFetcher(ctx, initial, fetcher, CommitLogEntry::getParents);
}

/**
Expand Down Expand Up @@ -775,7 +841,8 @@ protected CommitLogEntry buildIndividualCommit(
List<KeyWithBytes> puts,
List<Key> deletes,
int currentKeyListDistance,
Consumer<Hash> newKeyLists)
Consumer<Hash> newKeyLists,
@Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Hash commitHash = individualCommitHash(parentHashes, commitMeta, puts, deletes);

Expand All @@ -795,7 +862,7 @@ protected CommitLogEntry buildIndividualCommit(
Collections.emptyList());

if (keyListDistance >= config.getKeyListDistance()) {
entry = buildKeyList(ctx, entry, newKeyLists);
entry = buildKeyList(ctx, entry, newKeyLists, inMemoryCommits);
}
return entry;
}
Expand All @@ -818,7 +885,7 @@ protected Hash individualCommitHash(
return Hash.of(UnsafeByteOperations.unsafeWrap(hasher.hash().asBytes()));
}

/** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer)}. */
/** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer, Function)}. */
private static class KeyListBuildState {
final ImmutableCommitLogEntry.Builder newCommitEntry;
/** Builder for {@link CommitLogEntry#getKeyList()}. */
Expand Down Expand Up @@ -881,7 +948,10 @@ void addToEmbedded(KeyWithType keyWithType, int keyTypeSize) {
* to both read and re-write those rows for {@link KeyListEntity}.
*/
protected CommitLogEntry buildKeyList(
OP_CONTEXT ctx, CommitLogEntry unwrittenEntry, Consumer<Hash> newKeyLists)
OP_CONTEXT ctx,
CommitLogEntry unwrittenEntry,
Consumer<Hash> newKeyLists,
@Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
// Read commit-log until the previous persisted key-list

Expand All @@ -894,7 +964,7 @@ protected CommitLogEntry buildKeyList(
KeyListBuildState buildState =
new KeyListBuildState(entitySize(unwrittenEntry), newCommitEntry);

keysForCommitEntry(ctx, startHash)
keysForCommitEntry(ctx, startHash, inMemoryCommits)
.forEach(
keyWithType -> {
int keyTypeSize = entitySize(keyWithType);
Expand Down Expand Up @@ -981,17 +1051,18 @@ protected void checkForModifiedKeysBetweenExpectedAndCurrentCommit(
/** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */
protected Stream<KeyWithType> keysForCommitEntry(
OP_CONTEXT ctx, Hash hash, KeyFilterPredicate keyFilter) throws ReferenceNotFoundException {
return keysForCommitEntry(ctx, hash)
return keysForCommitEntry(ctx, hash, NO_IN_MEMORY_COMMITS)
.filter(kt -> keyFilter.check(kt.getKey(), kt.getContentId(), kt.getType()));
}

/** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */
protected Stream<KeyWithType> keysForCommitEntry(OP_CONTEXT ctx, Hash hash)
protected Stream<KeyWithType> keysForCommitEntry(
OP_CONTEXT ctx, Hash hash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
// walk the commit-logs in reverse order - starting with the last persisted key-list

Set<Key> seen = new HashSet<>();
Stream<CommitLogEntry> log = readCommitLogStream(ctx, hash);
Stream<CommitLogEntry> log = readCommitLogStream(ctx, hash, inMemoryCommits);
log = takeUntilIncludeLast(log, e -> e.getKeyList() != null);
return log.flatMap(
e -> {
Expand Down Expand Up @@ -1352,6 +1423,8 @@ protected Hash copyCommits(

int keyListDistance = targetHeadCommit != null ? targetHeadCommit.getKeyListDistance() : 0;

Map<Hash, CommitLogEntry> unwrittenCommits = new HashMap<>();

// Rewrite commits to transplant and store those in 'commitsToTransplantReverse'
for (int i = commitsChronological.size() - 1; i >= 0; i--, commitSeq++) {
CommitLogEntry sourceCommit = commitsChronological.get(i);
Expand All @@ -1377,9 +1450,12 @@ protected Hash copyCommits(
sourceCommit.getPuts(),
sourceCommit.getDeletes(),
keyListDistance,
newKeyLists);
newKeyLists,
unwrittenCommits::get);
keyListDistance = newEntry.getKeyListDistance();

unwrittenCommits.put(newEntry.getHash(), newEntry);

if (!newEntry.getHash().equals(sourceCommit.getHash())) {
commitsChronological.set(i, newEntry);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ protected GlobalStateLogEntry doFetchFromGlobalLog(
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPageResult(TABLE_COMMIT_LOG, hashes, ProtoSerialization::protoToCommitLogEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return hashes.stream()
.map(this::dbKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private static byte[] data(Document doc) {
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPage(client.getCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPage(
dbInstance.getCfCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.projectnessie.versioned.persist.adapter.DatabaseAdapterConfig.DEFAULT_KEY_LIST_DISTANCE;

import com.google.protobuf.ByteString;
import java.util.Arrays;
Expand All @@ -27,7 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.Key;
Expand All @@ -47,8 +49,16 @@ protected AbstractMergeTransplant(DatabaseAdapter databaseAdapter) {
this.databaseAdapter = databaseAdapter;
}

@Test
void merge() throws Exception {
@ParameterizedTest
@ValueSource(
ints = {
3,
10,
DEFAULT_KEY_LIST_DISTANCE,
DEFAULT_KEY_LIST_DISTANCE + 1,
100,
})
void merge(int numCommits) throws Exception {
AtomicInteger unifier = new AtomicInteger();
Function<ByteString, ByteString> metadataUpdater =
commitMeta ->
Expand All @@ -57,6 +67,7 @@ void merge() throws Exception {

Hash[] commits =
mergeTransplant(
numCommits,
(target, expectedHead, branch, commitHashes, i) ->
databaseAdapter.merge(commitHashes[i], target, expectedHead, metadataUpdater));

Expand All @@ -74,8 +85,16 @@ void merge() throws Exception {
.hasMessageStartingWith("No hashes to merge from '");
}

@Test
void transplant() throws Exception {
@ParameterizedTest
@ValueSource(
ints = {
3,
10,
DEFAULT_KEY_LIST_DISTANCE,
DEFAULT_KEY_LIST_DISTANCE + 1,
100,
})
void transplant(int numCommits) throws Exception {
AtomicInteger unifier = new AtomicInteger();
Function<ByteString, ByteString> metadataUpdater =
commitMeta ->
Expand All @@ -84,6 +103,7 @@ void transplant() throws Exception {

Hash[] commits =
mergeTransplant(
numCommits,
(target, expectedHead, branch, commitHashes, i) ->
databaseAdapter.transplant(
target,
Expand Down Expand Up @@ -148,14 +168,15 @@ void apply(
throws Exception;
}

private Hash[] mergeTransplant(MergeOrTransplant mergeOrTransplant) throws Exception {
private Hash[] mergeTransplant(int numCommits, MergeOrTransplant mergeOrTransplant)
throws Exception {
BranchName main = BranchName.of("main");
BranchName branch = BranchName.of("branch");
BranchName conflict = BranchName.of("conflict");

databaseAdapter.create(branch, databaseAdapter.hashOnReference(main, Optional.empty()));

Hash[] commits = new Hash[3];
Hash[] commits = new Hash[numCommits];
for (int i = 0; i < commits.length; i++) {
ImmutableCommitAttempt.Builder commit =
ImmutableCommitAttempt.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ protected CommitLogEntry doFetchFromCommitLog(Connection c, Hash hash) {
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(Connection c, List<Hash> hashes) {
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(Connection c, List<Hash> hashes) {
String sql = sqlForManyPlaceholders(SqlStatements.SELECT_COMMIT_LOG_MANY, hashes.size());

try (PreparedStatement ps = c.prepareStatement(sql)) {
Expand Down

0 comments on commit b57ac21

Please sign in to comment.