Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Fix NPE caused by txn commit race condition #5940

Merged
merged 5 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private Map<Long, Long> processReads(Map<Cell, byte[]> reads, Map<Long, Cell> st
// if we reach here, actual is guaranteed to be a staging value
try {
store.checkAndTouch(cell, actual);
checkAndTouch.put(cell, actual);
} catch (CheckAndSetException e) {
PutUnlessExistsValue<Long> kvsValue = encodingStrategy.decodeValueAsCommitTimestamp(
startTs, Iterables.getOnlyElement(e.getActualValues()));
Expand All @@ -100,10 +101,11 @@ private Map<Long, Long> processReads(Map<Cell, byte[]> reads, Map<Long, Cell> st
+ "was found in the KVS",
SafeArg.of("kvsValue", kvsValue),
SafeArg.of("stagingValue", currentValue));
continue;
} finally {
// If we got here after catching CheckAndSetException, then some other thread committed this
// transaction, and we must therefore return the commit timestamp.
resultBuilder.put(startTs, commitTs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit odd. If an exception is thrown, won't this appear to a caller that a value was committed, but in reality it has not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a CheckAndSetException is thrown here, it means the value was committed by another thread, and hence we must return it.

If other exceptions are thrown, then we will propagate the exception out of this method, and thus would not return the result anyway.

This is quite subtle, though, so I'll add a clarifying comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! because of the way the protocol works (ADR if you're interested), once you're able to perform a quorum read of some value V, the only permitted write operations are CAS((V, Staging), (V, Staging)) and PUT((V, Committed)).

}
checkAndTouch.put(cell, actual);
resultBuilder.put(startTs, commitTs);
}
store.put(KeyedStream.stream(checkAndTouch)
.map(encodingStrategy::transformStagingToCommitted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.CheckAndSetException;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService;
Expand Down Expand Up @@ -86,6 +88,29 @@ public void pueThatThrowsIsCorrectedOnGet() throws ExecutionException, Interrupt
verify(spiedStore).put(anyMap());
}

@Test
public void getReturnsStagingValuesThatWereCommittedBySomeoneElse()
throws ExecutionException, InterruptedException {
TwoPhaseEncodingStrategy strategy = TwoPhaseEncodingStrategy.INSTANCE;

long startTimestamp = 1L;
long commitTimestamp = 2L;
Cell timestampAsCell = strategy.encodeStartTimestampAsCell(startTimestamp);
byte[] stagingValue =
strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.staging(commitTimestamp));
byte[] committedValue =
strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.committed(commitTimestamp));
spiedStore.putUnlessExists(timestampAsCell, stagingValue);

List<byte[]> actualValues = ImmutableList.of(committedValue);

doThrow(new CheckAndSetException("done elsewhere", timestampAsCell, stagingValue, actualValues))
.when(spiedStore)
.checkAndTouch(timestampAsCell, stagingValue);

assertThat(pueTable.get(startTimestamp).get()).isEqualTo(commitTimestamp);
}

@Test
public void onceNonNullValueIsReturnedItIsAlwaysReturned() {
PutUnlessExistsTable<Long, Long> putUnlessExistsTable = new ResilientCommitTimestampPutUnlessExistsTable(
Expand Down
8 changes: 8 additions & 0 deletions changelog/@unreleased/pr-5940.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: fix
fix:
description: 'Fixed a race condition where if a transaction was just committed by
another thread, users would receive null values from TransactionService.get. This
caused users to observe that a completed transaction was still running, but did
not lead to any correctness issue. '
links:
- https://github.com/palantir/atlasdb/pull/5940