Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Clock went backwards corruption check #5131

Merged
merged 12 commits into from
Jan 6, 2021
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5131.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: TimeLock now validates the expected invariant for its timestamp bounds
i.e. the timestamp bound must increase with increasing sequence numbers.
links:
- https://github.com/palantir/atlasdb/pull/5131
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.palantir.timelock.config.TimeLockInstallConfiguration;
import com.palantir.timelock.corruption.detection.CorruptionHealthCheck;
import com.palantir.timelock.corruption.detection.LocalCorruptionDetector;
import com.palantir.timelock.corruption.detection.LocalTimestampInvariantsVerifier;
import com.palantir.timelock.corruption.detection.RemoteCorruptionDetector;
import com.palantir.timelock.history.LocalHistoryLoader;
import com.palantir.timelock.history.PaxosLogHistoryProvider;
Expand Down Expand Up @@ -253,8 +254,10 @@ private static TimeLockCorruptionComponents timeLockCorruptionComponents(
PaxosLogHistoryProvider historyProvider =
new PaxosLogHistoryProvider(dataSource, remoteClients.getRemoteHistoryProviders());

LocalCorruptionDetector localCorruptionDetector =
LocalCorruptionDetector.create(historyProvider, remoteClients.getRemoteCorruptionNotifiers());
LocalTimestampInvariantsVerifier timestampInvariantsVerifier = new LocalTimestampInvariantsVerifier(dataSource);

LocalCorruptionDetector localCorruptionDetector = LocalCorruptionDetector.create(
historyProvider, remoteClients.getRemoteCorruptionNotifiers(), timestampInvariantsVerifier);

CorruptionHealthCheck healthCheck =
new CorruptionHealthCheck(localCorruptionDetector, remoteCorruptionDetector);
Expand Down
1 change: 1 addition & 0 deletions timelock-corruption-detection/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
compile group: 'com.google.guava', name: 'guava'
compile group: 'com.github.rholder', name: 'guava-retrying'
compile group: 'com.palantir.conjure.java.api', name: 'service-config'
compile group: 'one.util', name: 'streamex'

annotationProcessor group: 'org.immutables', name: 'value'
compileOnly 'org.immutables:value::annotations'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public enum CorruptionCheckViolation {
NONE(false, false),
DIVERGED_LEARNERS(true, false), // this is false for now
CLOCK_WENT_BACKWARDS(true, false), // this is false for now
VALUE_LEARNED_WITHOUT_QUORUM(true, false),
ACCEPTED_VALUE_GREATER_THAN_LEARNED(true, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,43 @@ public final class LocalCorruptionDetector implements CorruptionDetector {
new NamedThreadFactory(CORRUPTION_DETECTOR_THREAD_PREFIX, true));
private final LocalCorruptionHandler corruptionHandler;
private final PaxosLogHistoryProvider historyProvider;
private final LocalTimestampInvariantsVerifier timestampInvariantsVerifier;

private volatile CorruptionStatus localCorruptionState = CorruptionStatus.HEALTHY;
private volatile CorruptionHealthReport localCorruptionReport = CorruptionHealthReport.defaultHealthyReport();

public static LocalCorruptionDetector create(
PaxosLogHistoryProvider historyProvider, List<TimeLockCorruptionNotifier> corruptionNotifiers) {
PaxosLogHistoryProvider historyProvider,
List<TimeLockCorruptionNotifier> corruptionNotifiers,
LocalTimestampInvariantsVerifier timestampInvariants) {
LocalCorruptionDetector localCorruptionDetector =
new LocalCorruptionDetector(historyProvider, corruptionNotifiers);
new LocalCorruptionDetector(historyProvider, corruptionNotifiers, timestampInvariants);

localCorruptionDetector.scheduleWithFixedDelay();
return localCorruptionDetector;
}

private LocalCorruptionDetector(
PaxosLogHistoryProvider historyProvider, List<TimeLockCorruptionNotifier> corruptionNotifiers) {
PaxosLogHistoryProvider historyProvider,
List<TimeLockCorruptionNotifier> corruptionNotifiers,
LocalTimestampInvariantsVerifier timestampInvariantsVerifier) {

this.historyProvider = historyProvider;
this.timestampInvariantsVerifier = timestampInvariantsVerifier;
this.corruptionHandler = new LocalCorruptionHandler(corruptionNotifiers);
}

private void scheduleWithFixedDelay() {
executor.scheduleWithFixedDelay(
() -> {
localCorruptionReport = analyzeHistoryAndBuildCorruptionHealthReport();
CorruptionHealthReport paxosRoundCorruptionReport = analyzeHistoryAndBuildCorruptionHealthReport();
CorruptionHealthReport timestampInvariantsReport =
timestampInvariantsVerifier.timestampInvariantsHealthReport();
localCorruptionReport = ImmutableCorruptionHealthReport.builder()
.from(paxosRoundCorruptionReport)
.putAllViolatingStatusesToNamespaceAndUseCase(
timestampInvariantsReport.violatingStatusesToNamespaceAndUseCase())
.build();
processLocalHealthReport();
},
TIMELOCK_CORRUPTION_ANALYSIS_INTERVAL.getSeconds(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.timelock.corruption.detection;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.common.streams.KeyedStream;
import com.palantir.paxos.ImmutableNamespaceAndUseCase;
import com.palantir.paxos.NamespaceAndUseCase;
import com.palantir.paxos.PaxosValue;
import com.palantir.timelock.history.models.LearnerUseCase;
import com.palantir.timelock.history.sqlite.SqlitePaxosStateLogHistory;
import com.palantir.timelock.history.util.UseCaseUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import one.util.streamex.StreamEx;

/**
* This class validates that timestamp bounds increase with increasing sequence numbers.
*
* The validation is done batch wise, e.g. [1, n], [n, 2 * n - 1] and so on. Two consecutive batches share
* boundaries, this is expected and is done to catch inversion at batch end.
* */
public class LocalTimestampInvariantsVerifier {
@VisibleForTesting
public static final int LEARNER_LOG_BATCH_SIZE_LIMIT = 250;

public static final long MIN_SEQUENCE_TO_BE_VERIFIED = Long.MIN_VALUE;

private final SqlitePaxosStateLogHistory sqlitePaxosStateLogHistory;
sudiksha27 marked this conversation as resolved.
Show resolved Hide resolved
private Map<NamespaceAndUseCase, Long> minInclusiveSeqBoundsToBeVerified = new ConcurrentHashMap<>();

public LocalTimestampInvariantsVerifier(DataSource dataSource) {
this.sqlitePaxosStateLogHistory = SqlitePaxosStateLogHistory.create(dataSource);
}

public CorruptionHealthReport timestampInvariantsHealthReport() {
SetMultimap<CorruptionCheckViolation, NamespaceAndUseCase> namespacesExhibitingViolations = KeyedStream.of(
getNamespaceAndUseCaseTuples())
.map(this::timestampInvariantsViolationLevel)
.filter(CorruptionCheckViolation::raiseErrorAlert)
.mapEntries((k, v) -> Maps.immutableEntry(v, k))
.collectToSetMultimap();
return ImmutableCorruptionHealthReport.builder()
.violatingStatusesToNamespaceAndUseCase(namespacesExhibitingViolations)
.build();
}

private CorruptionCheckViolation timestampInvariantsViolationLevel(NamespaceAndUseCase namespaceAndUseCase) {
Stream<Long> expectedSortedTimestamps = KeyedStream.stream(getLearnerLogs(namespaceAndUseCase))
.map(PaxosValue::getData)
.filter(Objects::nonNull)
.mapEntries((sequence, timestamp) -> Maps.immutableEntry(sequence, PtBytes.toLong(timestamp)))
.entries()
.sorted(Comparator.comparingLong(Map.Entry::getKey))
.map(Map.Entry::getValue);
return StreamEx.of(expectedSortedTimestamps)
.pairMap((first, second) -> first >= second)
.anyMatch(x -> x)
? CorruptionCheckViolation.CLOCK_WENT_BACKWARDS
: CorruptionCheckViolation.NONE;
}

private Map<Long, PaxosValue> getLearnerLogs(NamespaceAndUseCase namespaceAndUseCase) {
long minSeqToBeVerified = getMinSeqToBeVerified(namespaceAndUseCase);
LearnerUseCase useCase = LearnerUseCase.createLearnerUseCase(namespaceAndUseCase.useCase());

Map<Long, PaxosValue> learnerLogsSince = sqlitePaxosStateLogHistory.getLearnerLogsSince(
namespaceAndUseCase.namespace(), useCase, minSeqToBeVerified, LEARNER_LOG_BATCH_SIZE_LIMIT);

if (notEnoughLogsForVerification(learnerLogsSince)) {
resetMinSequenceToBeVerified(namespaceAndUseCase);
} else {
updateMinSeqToBeVerified(namespaceAndUseCase, learnerLogsSince);
}
return learnerLogsSince;
}

private boolean notEnoughLogsForVerification(Map<Long, PaxosValue> learnerLogsSince) {
return learnerLogsSince.size() <= 1;
}

private void updateMinSeqToBeVerified(NamespaceAndUseCase namespaceAndUseCase, Map<Long, PaxosValue> minSeq) {
minInclusiveSeqBoundsToBeVerified.put(namespaceAndUseCase, Collections.max(minSeq.keySet()));
}

private long getMinSeqToBeVerified(NamespaceAndUseCase namespaceAndUseCase) {
return minInclusiveSeqBoundsToBeVerified.computeIfAbsent(
namespaceAndUseCase, _u -> MIN_SEQUENCE_TO_BE_VERIFIED);
}

private void resetMinSequenceToBeVerified(NamespaceAndUseCase namespaceAndUseCase) {
minInclusiveSeqBoundsToBeVerified.put(namespaceAndUseCase, MIN_SEQUENCE_TO_BE_VERIFIED);
}

private ImmutableNamespaceAndUseCase getNamespaceAndUseCasePrefix(NamespaceAndUseCase namespaceAndUseCase) {
return ImmutableNamespaceAndUseCase.of(
namespaceAndUseCase.namespace(), UseCaseUtils.getPaxosUseCasePrefix(namespaceAndUseCase.useCase()));
}

private Set<NamespaceAndUseCase> getNamespaceAndUseCaseTuples() {
return sqlitePaxosStateLogHistory.getAllNamespaceAndUseCaseTuples().stream()
.map(this::getNamespaceAndUseCasePrefix)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public LearnerAndAcceptorRecords getLearnerAndAcceptorLogsInRange(
querySequenceBounds.getUpperBoundInclusive())));
}

public Map<Long, PaxosValue> getLearnerLogsSince(
Client namespace, LearnerUseCase learnerUseCase, long lowerBoundInclusive, int learnerLogBatchSizeLimit) {
return execute(dao -> dao.getLearnerLogsSince(
namespace, learnerUseCase.value(), lowerBoundInclusive, learnerLogBatchSizeLimit));
}

public long getGreatestLogEntry(Client client, LearnerUseCase useCase) {
return executeSqlitePaxosStateLogQuery(dao -> dao.getGreatestLogEntry(client, useCase.value()))
.orElse(PaxosAcceptor.NO_LOG_ENTRY);
Expand All @@ -99,11 +105,6 @@ public interface Queries {
@SqlQuery("SELECT DISTINCT namespace, useCase FROM paxosLog")
Set<NamespaceAndUseCase> getAllNamespaceAndUseCaseTuples();

// TODO(snanda): For now, limit is based on approximation and has not been tested with remotes. We need
// to
// revisit this once we have the remote history providers set up. Also, we may have to make it
// configurable to
// accommodate the rate at which logs are being published.
@SqlQuery("SELECT seq, val FROM paxosLog WHERE namespace = :namespace.value AND useCase = :useCase AND seq >="
+ " :lowerBoundInclusive AND seq <= :upperBoundInclusive")
Map<Long, PaxosValue> getLearnerLogsInRange(
Expand All @@ -119,5 +120,13 @@ Map<Long, PaxosAcceptorData> getAcceptorLogsInRange(
@Bind("useCase") String useCase,
@Bind("lowerBoundInclusive") long lowerBoundInclusive,
@Bind("upperBoundInclusive") long upperBoundInclusive);

@SqlQuery("SELECT seq, val FROM paxosLog WHERE namespace = :namespace.value AND useCase = :useCase AND seq >="
+ " :lowerBoundInclusive ORDER BY seq ASC LIMIT :limit")
Map<Long, PaxosValue> getLearnerLogsSince(
@BindPojo("namespace") Client namespace,
@Bind("useCase") String useCase,
@Bind("lowerBoundInclusive") long lowerBoundInclusive,
@Bind("limit") long learnerLogBatchSizeLimit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.timelock.corruption.detection;

import static com.palantir.timelock.corruption.detection.LocalTimestampInvariantsVerifier.LEARNER_LOG_BATCH_SIZE_LIMIT;

import org.junit.Rule;
import org.junit.Test;

public class LocalTimestampInvariantsVerifierTest {
@Rule
public TimeLockCorruptionDetectionHelper helper = new TimeLockCorruptionDetectionHelper();

@Test
public void detectsClockWentBackwardsOnNode() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
helper.createTimestampInversion(5);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsForDiscontinuousLogs() {
helper.writeLogsOnDefaultLocalServer(1, 27);
helper.writeLogsOnDefaultLocalServer(LEARNER_LOG_BATCH_SIZE_LIMIT, LEARNER_LOG_BATCH_SIZE_LIMIT + 97);

helper.createTimestampInversion(72);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsAtBatchStart() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(1);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsAtBatchEnd() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsStartOfNextBatch() {
helper.writeLogsOnDefaultLocalServer(1, 2 * LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(LEARNER_LOG_BATCH_SIZE_LIMIT);

// No signs of corruption in the first batch
helper.assertLocalTimestampInvariantsStandInNextBatch();

// Detects signs of corruption in the second batch
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsInLaterBatch() {
helper.writeLogsOnDefaultLocalAndRemote(1, 2 * LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(3 * LEARNER_LOG_BATCH_SIZE_LIMIT / 2);

// No signs of corruption in the first batch
helper.assertLocalTimestampInvariantsStandInNextBatch();

// Detects signs of corruption in the second batch
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void resetsProgressIfNotEnoughLogsForVerification() {
helper.createTimestampInversion(1);

// No signs of corruption since there aren't enough logs
helper.assertLocalTimestampInvariantsStandInNextBatch();

helper.writeLogsOnDefaultLocalServer(2, LEARNER_LOG_BATCH_SIZE_LIMIT);

// Detects signs of corruption in the now corrupt first batch of logs
helper.assertClockGoesBackwardsInNextBatch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ List<StateLogComponents> createStatLogComponentsForNamespaceAndUseCase(Namespace
return timeLockCorruptionTestSetup.createStatLogForNamespaceAndUseCase(namespaceAndUseCase);
}

void assertClockGoesBackwardsInNextBatch() {
assertLocalTimestampInvariants(ImmutableSet.of(CorruptionCheckViolation.CLOCK_WENT_BACKWARDS));
}

void assertLocalTimestampInvariantsStandInNextBatch() {
assertLocalTimestampInvariants(ImmutableSet.of());
}

void createTimestampInversion(int round) {
PaxosSerializationTestUtils.writePaxosValue(
getDefaultLocalServer().learnerLog(),
round,
PaxosSerializationTestUtils.createPaxosValueForRoundAndData(round, round * 100));
}

private void assertLocalTimestampInvariants(Set<CorruptionCheckViolation> violations) {
CorruptionHealthReport corruptionHealthReport = timeLockCorruptionTestSetup
.getLocalTimestampInvariantsVerifier()
.timestampInvariantsHealthReport();
assertThat(corruptionHealthReport
.violatingStatusesToNamespaceAndUseCase()
.keySet())
.hasSameElementsAs(violations);
}

private static void writeLogsOnServer(StateLogComponents server, int startInclusive, int endInclusive) {
PaxosSerializationTestUtils.writeToLogs(
server.acceptorLog(), server.learnerLog(), startInclusive, endInclusive);
Expand Down
Loading