Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use unsaved in-memory commits during key list re-construction #3486

Merged
merged 2 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.projectnessie.versioned.persist.adapter.spi.DatabaseAdapterUtil.takeUntilIncludeLast;
import static org.projectnessie.versioned.persist.adapter.spi.Traced.trace;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
Expand Down Expand Up @@ -110,6 +111,8 @@
public abstract class AbstractDatabaseAdapter<OP_CONTEXT, CONFIG extends DatabaseAdapterConfig>
implements DatabaseAdapter {

private static final Function<Hash, CommitLogEntry> NO_IN_MEMORY_COMMITS = hash -> null;

protected static final String TAG_HASH = "hash";
protected static final String TAG_COUNT = "count";
protected final CONFIG config;
Expand Down Expand Up @@ -197,7 +200,8 @@ protected CommitLogEntry commitAttempt(
commitAttempt.getPuts(),
commitAttempt.getDeletes(),
currentBranchEntry != null ? currentBranchEntry.getKeyListDistance() : 0,
newKeyLists);
newKeyLists,
NO_IN_MEMORY_COMMITS);
writeIndividualCommit(ctx, newBranchCommit);
return newBranchCommit;
}
Expand Down Expand Up @@ -642,38 +646,100 @@ protected final CommitLogEntry fetchFromCommitLog(OP_CONTEXT ctx, Hash hash) {
* must have exactly as many elements as in the parameter {@code hashes}. Non-existing hashes are
* returned as {@code null}.
*/
protected final List<CommitLogEntry> fetchPageFromCommitLog(OP_CONTEXT ctx, List<Hash> hashes) {
private List<CommitLogEntry> fetchMultipleFromCommitLog(OP_CONTEXT ctx, List<Hash> hashes) {
if (hashes.isEmpty()) {
return Collections.emptyList();
}
try (Traced ignore =
trace("fetchPageFromCommitLog")
.tag(TAG_HASH, hashes.get(0).asString())
.tag(TAG_COUNT, hashes.size())) {
return doFetchPageFromCommitLog(ctx, hashes);
return doFetchMultipleFromCommitLog(ctx, hashes);
}
}

private List<CommitLogEntry> fetchMultipleFromCommitLog(
OP_CONTEXT ctx, List<Hash> hashes, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits) {
List<CommitLogEntry> result = new ArrayList<>(hashes.size());
List<Hash> remainingHashes = new ArrayList<>(hashes.size());
List<Integer> remainingIndexes = new ArrayList<>(hashes.size());

// Prefetch commits already available in memory. Record indexes for the missing commits to
// enable placing them in the correct positions later, when they are fetched from storage.
int idx = 0;
for (Hash hash : hashes) {
CommitLogEntry found = inMemoryCommits.apply(hash);
if (found != null) {
result.add(found);
} else {
result.add(null); // to be replaced with storage result below
remainingHashes.add(hash);
remainingIndexes.add(idx);
}
idx++;
}

if (!remainingHashes.isEmpty()) {
List<CommitLogEntry> fromStorage = fetchMultipleFromCommitLog(ctx, remainingHashes);
// Fill the gaps in the final result list. Note that fetchPageFromCommitLog must return the
// list of the same size as its `remainingHashes` parameter.
idx = 0;
for (CommitLogEntry entry : fromStorage) {
int i = remainingIndexes.get(idx++);
result.set(i, entry);
}
}

return result;
}

protected abstract List<CommitLogEntry> doFetchPageFromCommitLog(
protected abstract List<CommitLogEntry> doFetchMultipleFromCommitLog(
OP_CONTEXT ctx, List<Hash> hashes);

/** Reads from the commit-log starting at the given commit-log-hash. */
protected Stream<CommitLogEntry> readCommitLogStream(OP_CONTEXT ctx, Hash initialHash)
throws ReferenceNotFoundException {
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash);
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash, NO_IN_MEMORY_COMMITS);
return StreamSupport.stream(split, false);
}

protected Stream<CommitLogEntry> readCommitLogStream(
OP_CONTEXT ctx, Hash initialHash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Spliterator<CommitLogEntry> split = readCommitLog(ctx, initialHash, inMemoryCommits);
return StreamSupport.stream(split, false);
}

protected Spliterator<CommitLogEntry> readCommitLog(OP_CONTEXT ctx, Hash initialHash)
protected Spliterator<CommitLogEntry> readCommitLog(
OP_CONTEXT ctx, Hash initialHash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Preconditions.checkNotNull(inMemoryCommits, "in-memory commits cannot be null");

if (NO_ANCESTOR.equals(initialHash)) {
return Spliterators.emptySpliterator();
}
CommitLogEntry initial = fetchFromCommitLog(ctx, initialHash);

CommitLogEntry initial = inMemoryCommits.apply(initialHash);
if (initial == null) {
initial = fetchFromCommitLog(ctx, initialHash);
}

if (initial == null) {
throw referenceNotFound(initialHash);
}
return logFetcher(ctx, initial, this::fetchPageFromCommitLog, CommitLogEntry::getParents);

BiFunction<OP_CONTEXT, List<Hash>, List<CommitLogEntry>> fetcher;
// Avoid creating unnecessary transient lists for the common case of fetching old commits from
// storage.
// Note: == comparison is fine in this situation because the "in memory" function is local to
// this class both for the empty and in the non-empty cases.
if (inMemoryCommits == NO_IN_MEMORY_COMMITS) {
fetcher = this::fetchMultipleFromCommitLog;
} else {
fetcher = (c, hashes) -> fetchMultipleFromCommitLog(c, hashes, inMemoryCommits);
}

return logFetcher(ctx, initial, fetcher, CommitLogEntry::getParents);
}

/**
Expand Down Expand Up @@ -775,7 +841,8 @@ protected CommitLogEntry buildIndividualCommit(
List<KeyWithBytes> puts,
List<Key> deletes,
int currentKeyListDistance,
Consumer<Hash> newKeyLists)
Consumer<Hash> newKeyLists,
@Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
Hash commitHash = individualCommitHash(parentHashes, commitMeta, puts, deletes);

Expand All @@ -795,7 +862,7 @@ protected CommitLogEntry buildIndividualCommit(
Collections.emptyList());

if (keyListDistance >= config.getKeyListDistance()) {
entry = buildKeyList(ctx, entry, newKeyLists);
entry = buildKeyList(ctx, entry, newKeyLists, inMemoryCommits);
}
return entry;
}
Expand All @@ -818,7 +885,7 @@ protected Hash individualCommitHash(
return Hash.of(UnsafeByteOperations.unsafeWrap(hasher.hash().asBytes()));
}

/** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer)}. */
/** Helper object for {@link #buildKeyList(Object, CommitLogEntry, Consumer, Function)}. */
private static class KeyListBuildState {
final ImmutableCommitLogEntry.Builder newCommitEntry;
/** Builder for {@link CommitLogEntry#getKeyList()}. */
Expand Down Expand Up @@ -881,7 +948,10 @@ void addToEmbedded(KeyWithType keyWithType, int keyTypeSize) {
* to both read and re-write those rows for {@link KeyListEntity}.
*/
protected CommitLogEntry buildKeyList(
OP_CONTEXT ctx, CommitLogEntry unwrittenEntry, Consumer<Hash> newKeyLists)
OP_CONTEXT ctx,
CommitLogEntry unwrittenEntry,
Consumer<Hash> newKeyLists,
@Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
// Read commit-log until the previous persisted key-list

Expand All @@ -894,7 +964,7 @@ protected CommitLogEntry buildKeyList(
KeyListBuildState buildState =
new KeyListBuildState(entitySize(unwrittenEntry), newCommitEntry);

keysForCommitEntry(ctx, startHash)
keysForCommitEntry(ctx, startHash, inMemoryCommits)
.forEach(
keyWithType -> {
int keyTypeSize = entitySize(keyWithType);
Expand Down Expand Up @@ -981,17 +1051,18 @@ protected void checkForModifiedKeysBetweenExpectedAndCurrentCommit(
/** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */
protected Stream<KeyWithType> keysForCommitEntry(
OP_CONTEXT ctx, Hash hash, KeyFilterPredicate keyFilter) throws ReferenceNotFoundException {
return keysForCommitEntry(ctx, hash)
return keysForCommitEntry(ctx, hash, NO_IN_MEMORY_COMMITS)
.filter(kt -> keyFilter.check(kt.getKey(), kt.getContentId(), kt.getType()));
}

/** Retrieve the content-keys and their types for the commit-log-entry with the given hash. */
protected Stream<KeyWithType> keysForCommitEntry(OP_CONTEXT ctx, Hash hash)
protected Stream<KeyWithType> keysForCommitEntry(
OP_CONTEXT ctx, Hash hash, @Nonnull Function<Hash, CommitLogEntry> inMemoryCommits)
throws ReferenceNotFoundException {
// walk the commit-logs in reverse order - starting with the last persisted key-list

Set<Key> seen = new HashSet<>();
Stream<CommitLogEntry> log = readCommitLogStream(ctx, hash);
Stream<CommitLogEntry> log = readCommitLogStream(ctx, hash, inMemoryCommits);
log = takeUntilIncludeLast(log, e -> e.getKeyList() != null);
return log.flatMap(
e -> {
Expand Down Expand Up @@ -1352,6 +1423,8 @@ protected Hash copyCommits(

int keyListDistance = targetHeadCommit != null ? targetHeadCommit.getKeyListDistance() : 0;

Map<Hash, CommitLogEntry> unwrittenCommits = new HashMap<>();

// Rewrite commits to transplant and store those in 'commitsToTransplantReverse'
for (int i = commitsChronological.size() - 1; i >= 0; i--, commitSeq++) {
CommitLogEntry sourceCommit = commitsChronological.get(i);
Expand All @@ -1377,9 +1450,12 @@ protected Hash copyCommits(
sourceCommit.getPuts(),
sourceCommit.getDeletes(),
keyListDistance,
newKeyLists);
newKeyLists,
unwrittenCommits::get);
keyListDistance = newEntry.getKeyListDistance();

unwrittenCommits.put(newEntry.getHash(), newEntry);

if (!newEntry.getHash().equals(sourceCommit.getHash())) {
commitsChronological.set(i, newEntry);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ protected GlobalStateLogEntry doFetchFromGlobalLog(
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPageResult(TABLE_COMMIT_LOG, hashes, ProtoSerialization::protoToCommitLogEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return hashes.stream()
.map(this::dbKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private static byte[] data(Document doc) {
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPage(client.getCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ protected CommitLogEntry doFetchFromCommitLog(NonTransactionalOperationContext c
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(
NonTransactionalOperationContext ctx, List<Hash> hashes) {
return fetchPage(
dbInstance.getCfCommitLog(), hashes, ProtoSerialization::protoToCommitLogEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.projectnessie.versioned.persist.adapter.DatabaseAdapterConfig.DEFAULT_KEY_LIST_DISTANCE;

import com.google.protobuf.ByteString;
import java.util.Arrays;
Expand All @@ -27,7 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.Key;
Expand All @@ -47,8 +49,16 @@ protected AbstractMergeTransplant(DatabaseAdapter databaseAdapter) {
this.databaseAdapter = databaseAdapter;
}

@Test
void merge() throws Exception {
@ParameterizedTest
@ValueSource(
ints = {
3,
10,
DEFAULT_KEY_LIST_DISTANCE,
DEFAULT_KEY_LIST_DISTANCE + 1,
100,
})
void merge(int numCommits) throws Exception {
AtomicInteger unifier = new AtomicInteger();
Function<ByteString, ByteString> metadataUpdater =
commitMeta ->
Expand All @@ -57,6 +67,7 @@ void merge() throws Exception {

Hash[] commits =
mergeTransplant(
numCommits,
(target, expectedHead, branch, commitHashes, i) ->
databaseAdapter.merge(commitHashes[i], target, expectedHead, metadataUpdater));

Expand All @@ -74,8 +85,16 @@ void merge() throws Exception {
.hasMessageStartingWith("No hashes to merge from '");
}

@Test
void transplant() throws Exception {
@ParameterizedTest
@ValueSource(
ints = {
3,
10,
DEFAULT_KEY_LIST_DISTANCE,
DEFAULT_KEY_LIST_DISTANCE + 1,
100,
})
void transplant(int numCommits) throws Exception {
AtomicInteger unifier = new AtomicInteger();
Function<ByteString, ByteString> metadataUpdater =
commitMeta ->
Expand All @@ -84,6 +103,7 @@ void transplant() throws Exception {

Hash[] commits =
mergeTransplant(
numCommits,
(target, expectedHead, branch, commitHashes, i) ->
databaseAdapter.transplant(
target,
Expand Down Expand Up @@ -148,14 +168,15 @@ void apply(
throws Exception;
}

private Hash[] mergeTransplant(MergeOrTransplant mergeOrTransplant) throws Exception {
private Hash[] mergeTransplant(int numCommits, MergeOrTransplant mergeOrTransplant)
throws Exception {
BranchName main = BranchName.of("main");
BranchName branch = BranchName.of("branch");
BranchName conflict = BranchName.of("conflict");

databaseAdapter.create(branch, databaseAdapter.hashOnReference(main, Optional.empty()));

Hash[] commits = new Hash[3];
Hash[] commits = new Hash[numCommits];
Copy link
Contributor

@ajantha-bhat ajantha-bhat Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that my PR for same issue got blocked because there was no strict validation. But this PR also don't have any validation but went ahead.
I am not sure I understand the process here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your change was complex and changed the algorithm/implementation to generate the key-lists.
Dmitri's change did not change that algorithm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked code coverage in IntelliJ (manually) - all branches in the new if statements were covered :)

for (int i = 0; i < commits.length; i++) {
ImmutableCommitAttempt.Builder commit =
ImmutableCommitAttempt.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ protected CommitLogEntry doFetchFromCommitLog(Connection c, Hash hash) {
}

@Override
protected List<CommitLogEntry> doFetchPageFromCommitLog(Connection c, List<Hash> hashes) {
protected List<CommitLogEntry> doFetchMultipleFromCommitLog(Connection c, List<Hash> hashes) {
String sql = sqlForManyPlaceholders(SqlStatements.SELECT_COMMIT_LOG_MANY, hashes.size());

try (PreparedStatement ps = c.prepareStatement(sql)) {
Expand Down