Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[PDS-158510] Improved Detection of Orphaned Sentinels #5231

Merged
merged 8 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -140,6 +141,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
Expand All @@ -155,6 +157,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -1255,16 +1258,51 @@ private <T> ListenableFuture<Collection<Map.Entry<Cell, T>>> getWithPostFilterin
* and where it was truncated. In this case, there is a chance that we end up with a sentinel with no
* valid AtlasDB cell covering it. In this case, we ignore it.
*/
private Set<Cell> findOrphanedSweepSentinels(TableReference table, Map<Cell, Value> rawResults) {
@VisibleForTesting
Set<Cell> findOrphanedSweepSentinels(TableReference table, Map<Cell, Value> rawResults) {
Set<Cell> sweepSentinels = Maps.filterValues(rawResults, SnapshotTransaction::isSweepSentinel)
.keySet();
if (sweepSentinels.isEmpty()) {
return Collections.emptySet();
}
Map<Cell, Long> atMaxTimestamp =
keyValueService.getLatestTimestamps(table, Maps.asMap(sweepSentinels, x -> Long.MAX_VALUE));
return Maps.filterValues(atMaxTimestamp, ts -> Value.INVALID_VALUE_TIMESTAMP == ts)
.keySet();

// for each sentinel, start at long max. Then iterate down with each found uncommitted value.
// if committed value seen, stop: the sentinel is not orphaned
// if we get back -1, the sentinel is orphaned
Map<Cell, Long> timestampCandidates = new HashMap<>(
keyValueService.getLatestTimestamps(table, Maps.asMap(sweepSentinels, x -> Long.MAX_VALUE)));
Set<Cell> actualOrphanedSentinels = new HashSet<>();

while (!timestampCandidates.isEmpty()) {
Map<SentinelType, Map<Cell, Long>> sentinelTypeToTimestamps = timestampCandidates.entrySet().stream()
.collect(Collectors.groupingBy(
entry -> entry.getValue() == Value.INVALID_VALUE_TIMESTAMP
? SentinelType.DEFINITE_ORPHANED
: SentinelType.INDETERMINATE,
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

Map<Cell, Long> definiteOrphans = sentinelTypeToTimestamps.get(SentinelType.DEFINITE_ORPHANED);
if (definiteOrphans != null) {
actualOrphanedSentinels.addAll(definiteOrphans.keySet());
}

Map<Cell, Long> cellsToQuery = sentinelTypeToTimestamps.get(SentinelType.INDETERMINATE);
if (cellsToQuery == null) {
break;
}
Set<Long> committedStartTimestamps = KeyedStream.stream(
defaultTransactionService.get(cellsToQuery.values()))
.filter(Objects::nonNull)
.keys()
.collect(Collectors.toSet());

Map<Cell, Long> nextTimestampCandidates = KeyedStream.stream(cellsToQuery)
.filter(cellStartTimestamp -> !committedStartTimestamps.contains(cellStartTimestamp))
.collectToMap();
timestampCandidates = keyValueService.getLatestTimestamps(table, nextTimestampCandidates);
}

return actualOrphanedSentinels;
}

private static boolean isSweepSentinel(Value value) {
Expand Down Expand Up @@ -2395,4 +2433,9 @@ private Histogram getHistogram(String name, TableReference tableRef) {
private Counter getCounter(String name, TableReference tableRef) {
return tableLevelMetricsController.createAndRegisterCounter(SnapshotTransaction.class, name, tableRef);
}

private enum SentinelType {
DEFINITE_ORPHANED,
INDETERMINATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setUp() throws Exception {
timestampService = new InMemoryTimestampService();
keyValueService = trackingKeyValueService(getBaseKeyValueService());
TransactionTables.createTables(keyValueService);
transactionService = TransactionServices.createRaw(keyValueService, timestampService, false);
transactionService = spy(TransactionServices.createRaw(keyValueService, timestampService, false));
conflictDetectionManager = ConflictDetectionManagers.createWithoutWarmingCache(keyValueService);
sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.transaction.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -134,6 +135,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -150,6 +153,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;

@SuppressWarnings("checkstyle:all")
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -290,6 +294,7 @@ public void cleanup() {}
TableReference.createFromFullyQualifiedName("default.table5");

private static final Cell TEST_CELL = Cell.create(PtBytes.toBytes("row1"), PtBytes.toBytes("column1"));
private static final Cell TEST_CELL_2 = Cell.create(PtBytes.toBytes("row2"), PtBytes.toBytes("column2"));

@Override
@Before
Expand Down Expand Up @@ -1438,6 +1443,192 @@ public void checkImmutableTsLockAfterReadsForConservativeIfFlagIsSet() {
timelockService.unlock(ImmutableSet.of(res.getLock()));
}

@Test
public void getOrphanedSweepSentinelDoesNotThrow() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);
assertThatCode(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.doesNotThrowAnyException();
}

@Test
public void getSweepSentinelUnderCommittedWriteThrowsAndCanBeRetried() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);

Transaction t2 = txManager.createNewTransaction();
t2.put(TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("blablabla")));
t2.commit();

assertThatThrownBy(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.isInstanceOf(TransactionFailedRetriableException.class)
.hasMessageContaining("Tried to read a value that has been deleted.");

Transaction retryTxn = txManager.createNewTransaction();
assertThatCode(() -> retryTxn.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.doesNotThrowAnyException();
}

@Test
public void getSweepSentinelUnderLateUncommittedWriteDoesNotThrow() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);
putUncommittedAtFreshTimestamp(TEST_CELL);

assertThatCode(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.doesNotThrowAnyException();
}

@Test
public void getSweepSentinelUnderEarlyUncommittedWriteDoesNotThrow() {
writeSentinelToTestTable(TEST_CELL);
putUncommittedAtFreshTimestamp(TEST_CELL);
Transaction t1 = txManager.createNewTransaction();

assertThatCode(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.doesNotThrowAnyException();
}

@Test
public void getSweepSentinelUnderMultipleUncommittedWritesDoesNotThrow() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);

for (int transaction = 1; transaction <= 10; transaction++) {
putUncommittedAtFreshTimestamp(TEST_CELL);
}

assertThatCode(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.doesNotThrowAnyException();
}

@Test
public void getSweepSentinelUnderHiddenCommittedWriteThrows() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);

Transaction t2 = txManager.createNewTransaction();
t2.put(TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("blablabla")));
t2.commit();

for (int transaction = 1; transaction <= 10; transaction++) {
putUncommittedAtFreshTimestamp(TEST_CELL);
}

assertThatThrownBy(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL)))
.isInstanceOf(TransactionFailedRetriableException.class)
.hasMessageContaining("Tried to read a value that has been deleted.");
}

@Test
public void getOrphanedSentinelAndSentinelUnderUncommittedWriteDoesNotThrow() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);
writeSentinelToTestTable(TEST_CELL_2);
putUncommittedAtFreshTimestamp(TEST_CELL_2);

assertThatCode(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL, TEST_CELL_2)))
.doesNotThrowAnyException();
}

@Test
public void getOrphanedSentinelAndSentinelUnderCommittedWriteThrows() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);
writeSentinelToTestTable(TEST_CELL_2);
Transaction t2 = txManager.createNewTransaction();
t2.put(TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(TEST_CELL_2, PtBytes.toBytes("covering")));
t2.commit();

assertThatThrownBy(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL, TEST_CELL_2)))
.isInstanceOf(TransactionFailedRetriableException.class)
.hasMessageContaining("Tried to read a value that has been deleted.");
}

@Test
public void getSentinelUnderCommittedAndSentinelUnderUncommittedWriteThrows() {
Transaction t1 = txManager.createNewTransaction();
writeSentinelToTestTable(TEST_CELL);
writeSentinelToTestTable(TEST_CELL_2);
putUncommittedAtFreshTimestamp(TEST_CELL);
Transaction t2 = txManager.createNewTransaction();
t2.put(TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(TEST_CELL_2, PtBytes.toBytes("covering")));
t2.commit();

assertThatThrownBy(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL, TEST_CELL_2)))
.isInstanceOf(TransactionFailedRetriableException.class)
.hasMessageContaining("Tried to read a value that has been deleted.");
}

@SuppressWarnings("unchecked") // ArgumentCaptor
@Test
public void getSentinelValuesStressTest() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

Transaction t1 = txManager.createNewTransaction();

List<Cell> cellsUnderUncommittedWrites = IntStream.rangeClosed(1, 100)
.boxed()
.map(num -> Cell.create(PtBytes.toBytes("row1"), PtBytes.toBytes(num)))
.collect(Collectors.toList());
List<Cell> cellsUnderHiddenCommittedWrites = IntStream.rangeClosed(1, 100)
.boxed()
.map(num -> Cell.create(PtBytes.toBytes("row2"), PtBytes.toBytes(num)))
.collect(Collectors.toList());

for (int index = 0; index < cellsUnderUncommittedWrites.size(); index++) {
Cell cell = cellsUnderUncommittedWrites.get(index);
writeSentinelToTestTable(cell);
for (int uncommittedValue = 0; uncommittedValue <= index; uncommittedValue++) {
putUncommittedAtFreshTimestamp(cell);
}
}

for (int index = 0; index < cellsUnderHiddenCommittedWrites.size(); index++) {
Cell cell = cellsUnderHiddenCommittedWrites.get(index);
writeSentinelToTestTable(cell);
Transaction txn = txManager.createNewTransaction();
txn.put(TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(cell, PtBytes.toBytes("i'm in a transaction")));
txn.commit();
for (int uncommittedValue = 0; uncommittedValue < index; uncommittedValue++) {
putUncommittedAtFreshTimestamp(cell);
}
}

ImmutableSet<Cell> cells = ImmutableSet.<Cell>builder()
.addAll(cellsUnderUncommittedWrites)
.addAll(cellsUnderHiddenCommittedWrites)
.build();
assertThatThrownBy(() -> t1.get(TABLE_SWEPT_CONSERVATIVE, cells))
.isInstanceOf(TransactionFailedRetriableException.class)
.hasMessageContaining("Tried to read a value that has been deleted.");

ArgumentCaptor<Iterable<Long>> captor = ArgumentCaptor.forClass(Iterable.class);
verify(transactionService, times(100)).get(captor.capture());

List<Iterable<Long>> stressTestRequests = captor.getAllValues();
assertThat(stressTestRequests).hasSize(100);
for (int index = 1; index < stressTestRequests.size(); index++) {
List<Long> previousTimestamps = StreamSupport.stream(
stressTestRequests.get(index - 1).spliterator(), false)
.collect(Collectors.toList());
assertThat(stressTestRequests.get(index)).hasSizeLessThan(previousTimestamps.size());
}

Transaction retryTxn = txManager.createNewTransaction();
assertThatCode(() -> retryTxn.get(TABLE_SWEPT_CONSERVATIVE, cells)).doesNotThrowAnyException();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert that retry succeeds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense - added.


private void putUncommittedAtFreshTimestamp(Cell cell) {
keyValueService.put(
TABLE_SWEPT_CONSERVATIVE,
ImmutableMap.of(cell, PtBytes.toBytes("i am uncommitted")),
txManager.createNewTransaction().getTimestamp());
}

private void writeSentinelToTestTable(Cell cell) {
keyValueService.put(
TABLE_SWEPT_CONSERVATIVE, ImmutableMap.of(cell, new byte[0]), Value.INVALID_VALUE_TIMESTAMP);
}

private void setTransactionConfig(TransactionConfig config) {
transactionConfig = config;
}
Expand Down
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-5231.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Orphaned sentinels are now correctly skipped over. Previously, an orphaned sentinel covered by a value written by a (possibly uncommitted) transaction would not be considered orphaned when read until Sweep or another read transaction cleaned up the data written by said uncommitted transaction.
links:
- https://github.com/palantir/atlasdb/pull/5231