Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Revert revert 5131 (#5182)
Browse files Browse the repository at this point in the history
* Revert "Revert "Clock went backwards corruption check (#5131)" (#5181)"

This reverts commit 7f6c373.

* Timestamps dont have to be strictly increasing

* Add test

* Add generated changelog entries
  • Loading branch information
sudiksha27 authored Jan 8, 2021
1 parent 76e85f0 commit 00eb359
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 11 deletions.
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5182.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: TimeLock now validates the expected invariant for its timestamp bounds
i.e. the timestamp bound must not decrease with increasing sequence numbers.
links:
- https://github.com/palantir/atlasdb/pull/5182
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.palantir.timelock.config.TimeLockInstallConfiguration;
import com.palantir.timelock.corruption.detection.CorruptionHealthCheck;
import com.palantir.timelock.corruption.detection.LocalCorruptionDetector;
import com.palantir.timelock.corruption.detection.LocalTimestampInvariantsVerifier;
import com.palantir.timelock.corruption.detection.RemoteCorruptionDetector;
import com.palantir.timelock.history.LocalHistoryLoader;
import com.palantir.timelock.history.PaxosLogHistoryProvider;
Expand Down Expand Up @@ -253,8 +254,10 @@ private static TimeLockCorruptionComponents timeLockCorruptionComponents(
PaxosLogHistoryProvider historyProvider =
new PaxosLogHistoryProvider(dataSource, remoteClients.getRemoteHistoryProviders());

LocalCorruptionDetector localCorruptionDetector =
LocalCorruptionDetector.create(historyProvider, remoteClients.getRemoteCorruptionNotifiers());
LocalTimestampInvariantsVerifier timestampInvariantsVerifier = new LocalTimestampInvariantsVerifier(dataSource);

LocalCorruptionDetector localCorruptionDetector = LocalCorruptionDetector.create(
historyProvider, remoteClients.getRemoteCorruptionNotifiers(), timestampInvariantsVerifier);

CorruptionHealthCheck healthCheck =
new CorruptionHealthCheck(localCorruptionDetector, remoteCorruptionDetector);
Expand Down
1 change: 1 addition & 0 deletions timelock-corruption-detection/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
compile group: 'com.google.guava', name: 'guava'
compile group: 'com.github.rholder', name: 'guava-retrying'
compile group: 'com.palantir.conjure.java.api', name: 'service-config'
compile group: 'one.util', name: 'streamex'

annotationProcessor group: 'org.immutables', name: 'value'
compileOnly 'org.immutables:value::annotations'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,43 @@ public final class LocalCorruptionDetector implements CorruptionDetector {
new NamedThreadFactory(CORRUPTION_DETECTOR_THREAD_PREFIX, true));
private final LocalCorruptionHandler corruptionHandler;
private final PaxosLogHistoryProvider historyProvider;
private final LocalTimestampInvariantsVerifier timestampInvariantsVerifier;

private volatile CorruptionStatus localCorruptionState = CorruptionStatus.HEALTHY;
private volatile CorruptionHealthReport localCorruptionReport = CorruptionHealthReport.defaultHealthyReport();

public static LocalCorruptionDetector create(
PaxosLogHistoryProvider historyProvider, List<TimeLockCorruptionNotifier> corruptionNotifiers) {
PaxosLogHistoryProvider historyProvider,
List<TimeLockCorruptionNotifier> corruptionNotifiers,
LocalTimestampInvariantsVerifier timestampInvariants) {
LocalCorruptionDetector localCorruptionDetector =
new LocalCorruptionDetector(historyProvider, corruptionNotifiers);
new LocalCorruptionDetector(historyProvider, corruptionNotifiers, timestampInvariants);

localCorruptionDetector.scheduleWithFixedDelay();
return localCorruptionDetector;
}

private LocalCorruptionDetector(
PaxosLogHistoryProvider historyProvider, List<TimeLockCorruptionNotifier> corruptionNotifiers) {
PaxosLogHistoryProvider historyProvider,
List<TimeLockCorruptionNotifier> corruptionNotifiers,
LocalTimestampInvariantsVerifier timestampInvariantsVerifier) {

this.historyProvider = historyProvider;
this.timestampInvariantsVerifier = timestampInvariantsVerifier;
this.corruptionHandler = new LocalCorruptionHandler(corruptionNotifiers);
}

private void scheduleWithFixedDelay() {
executor.scheduleWithFixedDelay(
() -> {
localCorruptionReport = analyzeHistoryAndBuildCorruptionHealthReport();
CorruptionHealthReport paxosRoundCorruptionReport = analyzeHistoryAndBuildCorruptionHealthReport();
CorruptionHealthReport timestampInvariantsReport =
timestampInvariantsVerifier.timestampInvariantsHealthReport();
localCorruptionReport = ImmutableCorruptionHealthReport.builder()
.from(paxosRoundCorruptionReport)
.putAllViolatingStatusesToNamespaceAndUseCase(
timestampInvariantsReport.violatingStatusesToNamespaceAndUseCase())
.build();
processLocalHealthReport();
},
TIMELOCK_CORRUPTION_ANALYSIS_INTERVAL.getSeconds(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.timelock.corruption.detection;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.common.streams.KeyedStream;
import com.palantir.paxos.ImmutableNamespaceAndUseCase;
import com.palantir.paxos.NamespaceAndUseCase;
import com.palantir.paxos.PaxosValue;
import com.palantir.timelock.history.models.LearnerUseCase;
import com.palantir.timelock.history.sqlite.SqlitePaxosStateLogHistory;
import com.palantir.timelock.history.util.UseCaseUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import one.util.streamex.StreamEx;

/**
* This class validates that timestamp bounds increase with increasing sequence numbers.
*
* The validation is done batch wise, e.g. [1, n], [n, 2 * n - 1] and so on. Two consecutive batches share
* boundaries, this is expected and is done to catch inversion at batch end.
* */
public class LocalTimestampInvariantsVerifier {
@VisibleForTesting
public static final int LEARNER_LOG_BATCH_SIZE_LIMIT = 250;

public static final long MIN_SEQUENCE_TO_BE_VERIFIED = Long.MIN_VALUE;

private final SqlitePaxosStateLogHistory sqlitePaxosStateLogHistory;
private Map<NamespaceAndUseCase, Long> minInclusiveSeqBoundsToBeVerified = new ConcurrentHashMap<>();

public LocalTimestampInvariantsVerifier(DataSource dataSource) {
this.sqlitePaxosStateLogHistory = SqlitePaxosStateLogHistory.create(dataSource);
}

public CorruptionHealthReport timestampInvariantsHealthReport() {
SetMultimap<CorruptionCheckViolation, NamespaceAndUseCase> namespacesExhibitingViolations = KeyedStream.of(
getNamespaceAndUseCaseTuples())
.map(this::timestampInvariantsViolationLevel)
.filter(CorruptionCheckViolation::raiseErrorAlert)
.mapEntries((k, v) -> Maps.immutableEntry(v, k))
.collectToSetMultimap();
return ImmutableCorruptionHealthReport.builder()
.violatingStatusesToNamespaceAndUseCase(namespacesExhibitingViolations)
.build();
}

private CorruptionCheckViolation timestampInvariantsViolationLevel(NamespaceAndUseCase namespaceAndUseCase) {
Stream<Long> expectedSortedTimestamps = KeyedStream.stream(getLearnerLogs(namespaceAndUseCase))
.map(PaxosValue::getData)
.filter(Objects::nonNull)
.mapEntries((sequence, timestamp) -> Maps.immutableEntry(sequence, PtBytes.toLong(timestamp)))
.entries()
.sorted(Comparator.comparingLong(Map.Entry::getKey))
.map(Map.Entry::getValue);
return StreamEx.of(expectedSortedTimestamps)
.pairMap((first, second) -> first > second)
.anyMatch(x -> x)
? CorruptionCheckViolation.CLOCK_WENT_BACKWARDS
: CorruptionCheckViolation.NONE;
}

private Map<Long, PaxosValue> getLearnerLogs(NamespaceAndUseCase namespaceAndUseCase) {
long minSeqToBeVerified = getMinSeqToBeVerified(namespaceAndUseCase);
LearnerUseCase useCase = LearnerUseCase.createLearnerUseCase(namespaceAndUseCase.useCase());

Map<Long, PaxosValue> learnerLogsSince = sqlitePaxosStateLogHistory.getLearnerLogsSince(
namespaceAndUseCase.namespace(), useCase, minSeqToBeVerified, LEARNER_LOG_BATCH_SIZE_LIMIT);

if (notEnoughLogsForVerification(learnerLogsSince)) {
resetMinSequenceToBeVerified(namespaceAndUseCase);
} else {
updateMinSeqToBeVerified(namespaceAndUseCase, learnerLogsSince);
}
return learnerLogsSince;
}

private boolean notEnoughLogsForVerification(Map<Long, PaxosValue> learnerLogsSince) {
return learnerLogsSince.size() <= 1;
}

private void updateMinSeqToBeVerified(NamespaceAndUseCase namespaceAndUseCase, Map<Long, PaxosValue> minSeq) {
minInclusiveSeqBoundsToBeVerified.put(namespaceAndUseCase, Collections.max(minSeq.keySet()));
}

private long getMinSeqToBeVerified(NamespaceAndUseCase namespaceAndUseCase) {
return minInclusiveSeqBoundsToBeVerified.computeIfAbsent(
namespaceAndUseCase, _u -> MIN_SEQUENCE_TO_BE_VERIFIED);
}

private void resetMinSequenceToBeVerified(NamespaceAndUseCase namespaceAndUseCase) {
minInclusiveSeqBoundsToBeVerified.put(namespaceAndUseCase, MIN_SEQUENCE_TO_BE_VERIFIED);
}

private ImmutableNamespaceAndUseCase getNamespaceAndUseCasePrefix(NamespaceAndUseCase namespaceAndUseCase) {
return ImmutableNamespaceAndUseCase.of(
namespaceAndUseCase.namespace(), UseCaseUtils.getPaxosUseCasePrefix(namespaceAndUseCase.useCase()));
}

private Set<NamespaceAndUseCase> getNamespaceAndUseCaseTuples() {
return sqlitePaxosStateLogHistory.getAllNamespaceAndUseCaseTuples().stream()
.map(this::getNamespaceAndUseCasePrefix)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public LearnerAndAcceptorRecords getLearnerAndAcceptorLogsInRange(
querySequenceBounds.getUpperBoundInclusive())));
}

public Map<Long, PaxosValue> getLearnerLogsSince(
Client namespace, LearnerUseCase learnerUseCase, long lowerBoundInclusive, int learnerLogBatchSizeLimit) {
return execute(dao -> dao.getLearnerLogsSince(
namespace, learnerUseCase.value(), lowerBoundInclusive, learnerLogBatchSizeLimit));
}

public long getGreatestLogEntry(Client client, LearnerUseCase useCase) {
return executeSqlitePaxosStateLogQuery(dao -> dao.getGreatestLogEntry(client, useCase.value()))
.orElse(PaxosAcceptor.NO_LOG_ENTRY);
Expand All @@ -99,11 +105,6 @@ public interface Queries {
@SqlQuery("SELECT DISTINCT namespace, useCase FROM paxosLog")
Set<NamespaceAndUseCase> getAllNamespaceAndUseCaseTuples();

// TODO(snanda): For now, limit is based on approximation and has not been tested with remotes. We need
// to
// revisit this once we have the remote history providers set up. Also, we may have to make it
// configurable to
// accommodate the rate at which logs are being published.
@SqlQuery("SELECT seq, val FROM paxosLog WHERE namespace = :namespace.value AND useCase = :useCase AND seq >="
+ " :lowerBoundInclusive AND seq <= :upperBoundInclusive")
Map<Long, PaxosValue> getLearnerLogsInRange(
Expand All @@ -119,5 +120,13 @@ Map<Long, PaxosAcceptorData> getAcceptorLogsInRange(
@Bind("useCase") String useCase,
@Bind("lowerBoundInclusive") long lowerBoundInclusive,
@Bind("upperBoundInclusive") long upperBoundInclusive);

@SqlQuery("SELECT seq, val FROM paxosLog WHERE namespace = :namespace.value AND useCase = :useCase AND seq >="
+ " :lowerBoundInclusive ORDER BY seq ASC LIMIT :limit")
Map<Long, PaxosValue> getLearnerLogsSince(
@BindPojo("namespace") Client namespace,
@Bind("useCase") String useCase,
@Bind("lowerBoundInclusive") long lowerBoundInclusive,
@Bind("limit") long learnerLogBatchSizeLimit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.timelock.corruption.detection;

import static com.palantir.timelock.corruption.detection.LocalTimestampInvariantsVerifier.LEARNER_LOG_BATCH_SIZE_LIMIT;

import com.palantir.timelock.corruption.detection.TimeLockCorruptionTestSetup.StateLogComponents;
import com.palantir.timelock.history.utils.PaxosSerializationTestUtils;
import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;

public class LocalTimestampInvariantsVerifierTest {
@Rule
public TimeLockCorruptionDetectionHelper helper = new TimeLockCorruptionDetectionHelper();

@Test
public void isHealthyIfTimestampsAreIncreasing() {
// write increasing timestamp values in range [1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1]
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
helper.assertLocalTimestampInvariantsStandInNextBatch();
}

@Test
public void isHealthyIfTimestampsAreNonDecreasing() {
StateLogComponents localServer = helper.getDefaultLocalServer();
// write a constant timestamp value in range [1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1]
IntStream.rangeClosed(1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1)
.boxed()
.forEach(round -> PaxosSerializationTestUtils.writePaxosValue(
localServer.learnerLog(),
round,
PaxosSerializationTestUtils.createPaxosValueForRoundAndData(round, 5)));
helper.assertLocalTimestampInvariantsStandInNextBatch();
}

@Test
public void detectsClockWentBackwardsOnNode() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
helper.createTimestampInversion(5);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsForDiscontinuousLogs() {
helper.writeLogsOnDefaultLocalServer(1, 27);
helper.writeLogsOnDefaultLocalServer(LEARNER_LOG_BATCH_SIZE_LIMIT, LEARNER_LOG_BATCH_SIZE_LIMIT + 97);

helper.createTimestampInversion(72);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsAtBatchStart() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(1);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsAtBatchEnd() {
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsStartOfNextBatch() {
helper.writeLogsOnDefaultLocalServer(1, 2 * LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(LEARNER_LOG_BATCH_SIZE_LIMIT);

// No signs of corruption in the first batch
helper.assertLocalTimestampInvariantsStandInNextBatch();

// Detects signs of corruption in the second batch
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void detectsClockWentBackwardsInLaterBatch() {
helper.writeLogsOnDefaultLocalAndRemote(1, 2 * LEARNER_LOG_BATCH_SIZE_LIMIT);
helper.createTimestampInversion(3 * LEARNER_LOG_BATCH_SIZE_LIMIT / 2);

// No signs of corruption in the first batch
helper.assertLocalTimestampInvariantsStandInNextBatch();

// Detects signs of corruption in the second batch
helper.assertClockGoesBackwardsInNextBatch();
}

@Test
public void resetsProgressIfNotEnoughLogsForVerification() {
helper.createTimestampInversion(1);

// No signs of corruption since there aren't enough logs
helper.assertLocalTimestampInvariantsStandInNextBatch();

helper.writeLogsOnDefaultLocalServer(2, LEARNER_LOG_BATCH_SIZE_LIMIT);

// Detects signs of corruption in the now corrupt first batch of logs
helper.assertClockGoesBackwardsInNextBatch();
}
}
Loading

0 comments on commit 00eb359

Please sign in to comment.