From f533e9f87910a1200727e758b497ded321c4e76e Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Mon, 7 Mar 2022 16:49:26 +0000 Subject: [PATCH 1/5] Return values committed by other threads Fixes NPEs caused by race condition between multiple attempts to commit or abort a transaction. --- .../pue/ResilientCommitTimestampPutUnlessExistsTable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java index 19fa327aba1..e26caa56397 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java @@ -91,6 +91,7 @@ private Map processReads(Map reads, Map st // if we reach here, actual is guaranteed to be a staging value try { store.checkAndTouch(cell, actual); + checkAndTouch.put(cell, actual); } catch (CheckAndSetException e) { PutUnlessExistsValue kvsValue = encodingStrategy.decodeValueAsCommitTimestamp( startTs, Iterables.getOnlyElement(e.getActualValues())); @@ -100,10 +101,9 @@ private Map processReads(Map reads, Map st + "was found in the KVS", SafeArg.of("kvsValue", kvsValue), SafeArg.of("stagingValue", currentValue)); - continue; + } finally { + resultBuilder.put(startTs, commitTs); } - checkAndTouch.put(cell, actual); - resultBuilder.put(startTs, commitTs); } store.put(KeyedStream.stream(checkAndTouch) .map(encodingStrategy::transformStagingToCommitted) From a535327fda97be28afb80a31cd9937eae2d8f6d3 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Mon, 7 Mar 2022 17:44:23 +0000 Subject: [PATCH 2/5] test that failed without the change --- ...mmitTimestampPutUnlessExistsTableTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java index 4add06e79f5..6ef61137ec7 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTableTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.CheckAndSetException; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; @@ -86,6 +88,29 @@ public void pueThatThrowsIsCorrectedOnGet() throws ExecutionException, Interrupt verify(spiedStore).put(anyMap()); } + @Test + public void getReturnsStagingValuesThatWereCommittedBySomeoneElse() + throws ExecutionException, InterruptedException { + TwoPhaseEncodingStrategy strategy = TwoPhaseEncodingStrategy.INSTANCE; + + long startTimestamp = 1L; + long commitTimestamp = 2L; + Cell timestampAsCell = strategy.encodeStartTimestampAsCell(startTimestamp); + byte[] stagingValue = + strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.staging(commitTimestamp)); + byte[] committedValue = + strategy.encodeCommitTimestampAsValue(startTimestamp, PutUnlessExistsValue.committed(commitTimestamp)); + spiedStore.putUnlessExists(timestampAsCell, stagingValue); + + List actualValues = ImmutableList.of(committedValue); + + doThrow(new CheckAndSetException("done elsewhere", timestampAsCell, stagingValue, actualValues)) + .when(spiedStore) + .checkAndTouch(timestampAsCell, stagingValue); + + assertThat(pueTable.get(startTimestamp).get()).isEqualTo(commitTimestamp); + } + @Test public void onceNonNullValueIsReturnedItIsAlwaysReturned() { PutUnlessExistsTable putUnlessExistsTable = new ResilientCommitTimestampPutUnlessExistsTable( From 2ee4ecec667b067373934bc21c9b519321a37820 Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Mon, 7 Mar 2022 17:46:16 +0000 Subject: [PATCH 3/5] Add generated changelog entries --- changelog/@unreleased/pr-5940.v2.yml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog/@unreleased/pr-5940.v2.yml diff --git a/changelog/@unreleased/pr-5940.v2.yml b/changelog/@unreleased/pr-5940.v2.yml new file mode 100644 index 00000000000..2b01f1dc052 --- /dev/null +++ b/changelog/@unreleased/pr-5940.v2.yml @@ -0,0 +1,6 @@ +type: fix +fix: + description: Fixed a race condition where if a transaction was just committed by + another thread, we may return null values from TransactionService.get. + links: + - https://github.com/palantir/atlasdb/pull/5940 From 16523abf6a2e4daacb140df04c803f194b258d58 Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Tue, 8 Mar 2022 11:30:12 +0000 Subject: [PATCH 4/5] Add generated changelog entries --- changelog/@unreleased/pr-5940.v2.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/changelog/@unreleased/pr-5940.v2.yml b/changelog/@unreleased/pr-5940.v2.yml index 2b01f1dc052..18acfc22c08 100644 --- a/changelog/@unreleased/pr-5940.v2.yml +++ b/changelog/@unreleased/pr-5940.v2.yml @@ -1,6 +1,8 @@ type: fix fix: - description: Fixed a race condition where if a transaction was just committed by - another thread, we may return null values from TransactionService.get. + description: 'Fixed a race condition where if a transaction was just committed by + another thread, users would receive null values from TransactionService.get. This + caused users to observe that a completed transaction was still running, but did + not lead to any correctness issue. ' links: - https://github.com/palantir/atlasdb/pull/5940 From b96a5792a46ca84f6232bf5ae130d21452ccd083 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 8 Mar 2022 11:30:37 +0000 Subject: [PATCH 5/5] clarify --- .../pue/ResilientCommitTimestampPutUnlessExistsTable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java index e26caa56397..2af6c67c3e9 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/pue/ResilientCommitTimestampPutUnlessExistsTable.java @@ -102,6 +102,8 @@ private Map processReads(Map reads, Map st SafeArg.of("kvsValue", kvsValue), SafeArg.of("stagingValue", currentValue)); } finally { + // If we got here after catching CheckAndSetException, then some other thread committed this + // transaction, and we must therefore return the commit timestamp. resultBuilder.put(startTs, commitTs); } }