Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[KeyValueSnapshotReader Phase I] Part 2: ReadValidationCommitTimestam…
Browse files Browse the repository at this point in the history
…pLoader (#7017)

* CTL

* self cr part 1

* self CR part 2

* update javadoc

* push comment out to javadoc

* Split out read validation

* ideas

* original flavouring

* no need to spam eq

* edge case

* polish

* Not Jetbrains

* CR

* new world

* sleepy stuff

* self cr

* circle not running?

* fix
  • Loading branch information
jeremyk-91 authored Mar 19, 2024
1 parent 4c976cf commit 5110f59
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@
public interface CommitTimestampLoader {
/**
* Returns a map from start timestamp to commit timestamp. If the transaction corresponding to a start timestamp
* has neither committed nor aborted, it will be missing from the map. If configured, this method will block until
* has neither committed nor aborted, it will be missing from the map. This method will block until
* the transactions for these start timestamps are believed to no longer be running.
*
* Note that this method does not actively abort transactions - in particular, a transaction that is believed to
* no longer be running may still commit in the future.
*/
ListenableFuture<LongLongMap> getCommitTimestamps(
@Nullable TableReference tableRef, LongIterable startTimestamps, boolean shouldWaitForCommitterToComplete);
ListenableFuture<LongLongMap> getCommitTimestamps(@Nullable TableReference tableRef, LongIterable startTimestamps);

/**
* Performs {@link #getCommitTimestamps(TableReference, LongIterable)} as described above, but does not wait for
* transactions for these start timestamps to be believed to no longer be running. This is likely to be relevant
* in contexts such as conflict checking, where we want to check for conflicts with transactions that are still
* running while avoiding possible deadlocks between transactions reading or writing to overlapping sets of cells.
*/
ListenableFuture<LongLongMap> getCommitTimestampsNonBlockingForValidation(
@Nullable TableReference tableRef, LongIterable startTimestamps);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ public DefaultCommitTimestampLoader(

@Override
public ListenableFuture<LongLongMap> getCommitTimestamps(
@Nullable TableReference tableRef, LongIterable startTimestamps) {
return getCommitTimestampsInternal(tableRef, startTimestamps, true);
}

@Override
public ListenableFuture<LongLongMap> getCommitTimestampsNonBlockingForValidation(
@Nullable TableReference tableRef, LongIterable startTimestamps) {
return getCommitTimestampsInternal(tableRef, startTimestamps, false);
}

private ListenableFuture<LongLongMap> getCommitTimestampsInternal(
@Nullable TableReference tableRef, LongIterable startTimestamps, boolean shouldWaitForCommitterToComplete) {
if (startTimestamps.isEmpty()) {
return Futures.immediateFuture(LongLongMaps.immutable.of());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.impl;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.atlasdb.futures.AtlasFutures;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.CommitTimestampLoader;
import com.palantir.atlasdb.transaction.api.TransactionSerializableConflictException;
import com.palantir.atlasdb.transaction.impl.SerializableTransaction.PartitionedTimestamps;
import com.palantir.atlasdb.transaction.impl.metrics.TransactionOutcomeMetrics;
import javax.annotation.Nullable;
import org.eclipse.collections.api.LongIterable;
import org.eclipse.collections.api.factory.primitive.LongLongMaps;
import org.eclipse.collections.api.factory.primitive.LongSets;
import org.eclipse.collections.api.map.primitive.LongLongMap;
import org.eclipse.collections.api.map.primitive.MutableLongLongMap;
import org.eclipse.collections.api.set.primitive.LongSet;
import org.eclipse.collections.api.set.primitive.MutableLongSet;

/**
* Loads commit timestamps for read validation, considering a simulated state of the world where the serializable
* transaction that we're performing validation for has already committed.
*/
public final class ReadValidationCommitTimestampLoader implements CommitTimestampLoader {
private final CommitTimestampLoader delegate;
private final long startTs;
private final long commitTs;
private final TransactionOutcomeMetrics transactionOutcomeMetrics;

public ReadValidationCommitTimestampLoader(
CommitTimestampLoader delegate,
long startTs,
long commitTs,
TransactionOutcomeMetrics transactionOutcomeMetrics) {
this.delegate = delegate;
this.startTs = startTs;
this.commitTs = commitTs;
this.transactionOutcomeMetrics = transactionOutcomeMetrics;
}

@Override
public ListenableFuture<LongLongMap> getCommitTimestamps(
@Nullable TableReference tableRef, LongIterable startTimestamps) {
return getCommitTimestampsInternal(tableRef, startTimestamps, true);
}

@Override
public ListenableFuture<LongLongMap> getCommitTimestampsNonBlockingForValidation(
@Nullable TableReference tableRef, LongIterable startTimestamps) {
return getCommitTimestampsInternal(tableRef, startTimestamps, false);
}

private ListenableFuture<LongLongMap> getCommitTimestampsInternal(
@Nullable TableReference tableRef, LongIterable startTimestamps, boolean shouldWaitForCommitterToComplete) {
PartitionedTimestamps partitionedTimestamps = splitTransactionBeforeAndAfter(startTs, startTimestamps);

ListenableFuture<LongLongMap> postStartCommitTimestamps =
getCommitTimestampsForTransactionsStartedAfterMe(tableRef, partitionedTimestamps.afterStart());

// We are ok to block here because if there is a cycle of transactions that could result in a deadlock,
// then at least one of them will be in the ab
ListenableFuture<LongLongMap> preStartCommitTimestamps;
if (partitionedTimestamps.beforeStart().isEmpty()) {
// TODO (jkong): If called frequently, can consider memoisation.
preStartCommitTimestamps = Futures.immediateFuture(LongLongMaps.immutable.empty());
} else {
if (shouldWaitForCommitterToComplete) {
preStartCommitTimestamps = delegate.getCommitTimestamps(tableRef, partitionedTimestamps.beforeStart());
} else {
preStartCommitTimestamps = delegate.getCommitTimestampsNonBlockingForValidation(
tableRef, partitionedTimestamps.beforeStart());
}
}

return Futures.whenAllComplete(postStartCommitTimestamps, preStartCommitTimestamps)
.call(
() -> {
MutableLongLongMap map =
LongLongMaps.mutable.withAll(AtlasFutures.getDone(preStartCommitTimestamps));
map.putAll(AtlasFutures.getDone(postStartCommitTimestamps));
map.putAll(partitionedTimestamps.myCommittedTransaction());
return map.toImmutable();
},
MoreExecutors.directExecutor());
}

private ListenableFuture<LongLongMap> getCommitTimestampsForTransactionsStartedAfterMe(
@Nullable TableReference tableRef, LongSet startTimestamps) {
if (startTimestamps.isEmpty()) {
return Futures.immediateFuture(LongLongMaps.immutable.empty());
}

return Futures.transform(
// We do not block when waiting for results that were written after our start timestamp.
// If we block here it may lead to deadlock if two transactions (or a cycle of any length) have
// all written their data and all doing checks before committing.
delegate.getCommitTimestampsNonBlockingForValidation(tableRef, startTimestamps),
startToCommitTimestamps -> {
if (startToCommitTimestamps.keySet().containsAll(startTimestamps)) {
return startToCommitTimestamps;
}
// If we do not get back all these results we may be in the deadlock case so we
// should just fail out early. It may be the case that abort more transactions
// than needed to break the deadlock cycle, but this should be pretty rare.
transactionOutcomeMetrics.markReadWriteConflict(tableRef);
throw new TransactionSerializableConflictException(
"An uncommitted conflicting read was written after our start timestamp for table "
+ tableRef + ". This case can cause deadlock and is very likely to be a "
+ "read write conflict.",
tableRef);
},
MoreExecutors.directExecutor());
}

/**
* Partitions {@code startTimestamps} in two sets, based on their relation to the start timestamp provided.
*
* @param myStart start timestamp of this transaction
* @param startTimestamps of transactions we are interested in
* @return a {@link PartitionedTimestamps} object containing split timestamps
*/
private PartitionedTimestamps splitTransactionBeforeAndAfter(long myStart, LongIterable startTimestamps) {
ImmutablePartitionedTimestamps.Builder builder =
ImmutablePartitionedTimestamps.builder().myCommitTimestamp(commitTs);
MutableLongSet beforeStart = LongSets.mutable.empty();
MutableLongSet afterStart = LongSets.mutable.empty();
startTimestamps.forEach(startTimestamp -> {
if (startTimestamp == myStart) {
builder.splittingStartTimestamp(myStart);
} else if (startTimestamp < myStart) {
beforeStart.add(startTimestamp);
} else {
afterStart.add(startTimestamp);
}
});
builder.beforeStart(beforeStart.asUnmodifiable());
builder.afterStart(afterStart.asUnmodifiable());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.debug.ConflictTracer;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.futures.AtlasFutures;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection;
Expand Down Expand Up @@ -113,13 +112,9 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.collections.api.LongIterable;
import org.eclipse.collections.api.factory.primitive.LongLongMaps;
import org.eclipse.collections.api.factory.primitive.LongSets;
import org.eclipse.collections.api.map.primitive.LongLongMap;
import org.eclipse.collections.api.map.primitive.MutableLongLongMap;
import org.eclipse.collections.api.set.primitive.LongSet;
import org.eclipse.collections.api.set.primitive.MutableLongSet;
import org.immutables.value.Value;

/**
Expand Down Expand Up @@ -941,94 +936,12 @@ private Transaction getReadOnlyTransaction(final long commitTs) {
conflictTracer,
tableLevelMetricsController,
knowledge,
// TODO (jkong): Remove when extracting a custom read-only commit timestamp loader
commitTimestampLoader) {
new ReadValidationCommitTimestampLoader(
commitTimestampLoader, getTimestamp(), commitTs, transactionOutcomeMetrics)) {
@Override
protected TransactionScopedCache getCache() {
return lockWatchManager.getReadOnlyTransactionScopedCache(SerializableTransaction.this.getTimestamp());
}

@Override
protected ListenableFuture<LongLongMap> getCommitTimestamps(
TableReference tableRef, LongIterable startTimestamps, boolean shouldWaitForCommitterToComplete) {
long myStart = SerializableTransaction.this.getTimestamp();
PartitionedTimestamps partitionedTimestamps = splitTransactionBeforeAndAfter(myStart, startTimestamps);

ListenableFuture<LongLongMap> postStartCommitTimestamps =
getCommitTimestampsForTransactionsStartedAfterMe(tableRef, partitionedTimestamps.afterStart());

// We are ok to block here because if there is a cycle of transactions that could result in a deadlock,
// then at least one of them will be in the ab
ListenableFuture<LongLongMap> preStartCommitTimestamps = super.getCommitTimestamps(
tableRef, partitionedTimestamps.beforeStart(), shouldWaitForCommitterToComplete);

return Futures.whenAllComplete(postStartCommitTimestamps, preStartCommitTimestamps)
.call(
() -> {
MutableLongLongMap map = LongLongMaps.mutable.withAll(
AtlasFutures.getDone(preStartCommitTimestamps));
map.putAll(AtlasFutures.getDone(postStartCommitTimestamps));
map.putAll(partitionedTimestamps.myCommittedTransaction());
return map.toImmutable();
},
MoreExecutors.directExecutor());
}

private ListenableFuture<LongLongMap> getCommitTimestampsForTransactionsStartedAfterMe(
TableReference tableRef, LongSet startTimestamps) {
if (startTimestamps.isEmpty()) {
return Futures.immediateFuture(LongLongMaps.immutable.empty());
}

return Futures.transform(
// We do not block when waiting for results that were written after our start timestamp.
// If we block here it may lead to deadlock if two transactions (or a cycle of any length) have
// all written their data and all doing checks before committing.
super.getCommitTimestamps(tableRef, startTimestamps, false),
startToCommitTimestamps -> {
if (startToCommitTimestamps.keySet().containsAll(startTimestamps)) {
return startToCommitTimestamps;
}
// If we do not get back all these results we may be in the deadlock case so we
// should just fail out early. It may be the case that abort more transactions
// than needed to break the deadlock cycle, but this should be pretty rare.
transactionOutcomeMetrics.markReadWriteConflict(tableRef);
throw new TransactionSerializableConflictException(
"An uncommitted conflicting read was "
+ "written after our start timestamp for table "
+ tableRef + ". "
+ "This case can cause deadlock and is very likely to be a "
+ "read write conflict.",
tableRef);
},
MoreExecutors.directExecutor());
}

/**
* Partitions {@code startTimestamps} in two sets, based on their relation to the start timestamp provided.
*
* @param myStart start timestamp of this transaction
* @param startTimestamps of transactions we are interested in
* @return a {@link PartitionedTimestamps} object containing split timestamps
*/
private PartitionedTimestamps splitTransactionBeforeAndAfter(long myStart, LongIterable startTimestamps) {
ImmutablePartitionedTimestamps.Builder builder =
ImmutablePartitionedTimestamps.builder().myCommitTimestamp(commitTs);
MutableLongSet beforeStart = LongSets.mutable.empty();
MutableLongSet afterStart = LongSets.mutable.empty();
startTimestamps.forEach(startTimestamp -> {
if (startTimestamp == myStart) {
builder.splittingStartTimestamp(myStart);
} else if (startTimestamp < myStart) {
beforeStart.add(startTimestamp);
} else {
afterStart.add(startTimestamp);
}
});
builder.beforeStart(beforeStart.asUnmodifiable());
builder.afterStart(afterStart.asUnmodifiable());
return builder.build();
}
};
}

Expand Down
Loading

0 comments on commit 5110f59

Please sign in to comment.