-
Notifications
You must be signed in to change notification settings - Fork 15
Clock went backwards corruption check #5131
Conversation
Generate changelog in
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, though the analyzer itself need some polish
.sorted(Comparator.comparingLong(SequenceAndTimestampPair::sequence)) | ||
.collect(Collectors.toList()); | ||
|
||
long greatestTimestampSoFar = -1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, I don't think we guarantee non-negative timestamps, so this should be Long.MIN_VALUE
List<SequenceAndTimestampPair> seqAndTimestampPairs = KeyedStream.stream(record.record()) | ||
.map(val -> getPaxosValueData(val.learnedValue())) | ||
.filter(Predicates.notNull()) | ||
.mapEntries((sequence, timestamp) -> Maps.immutableEntry( | ||
sequence, ImmutableSequenceAndTimestampPair.of(sequence, PtBytes.toLong(timestamp)))) | ||
.values() | ||
.sorted(Comparator.comparingLong(SequenceAndTimestampPair::sequence)) | ||
.collect(Collectors.toList()); | ||
|
||
long greatestTimestampSoFar = -1L; | ||
long currentTimestamp; | ||
for (SequenceAndTimestampPair sequenceAndTimestampPair : seqAndTimestampPairs) { | ||
currentTimestamp = sequenceAndTimestampPair.timestamp(); | ||
if (currentTimestamp <= greatestTimestampSoFar) { | ||
return true; | ||
} | ||
greatestTimestampSoFar = currentTimestamp; | ||
} | ||
return false; | ||
} | ||
|
||
@Value.Immutable | ||
interface SequenceAndTimestampPair { | ||
@Value.Parameter | ||
long sequence(); | ||
|
||
@Value.Parameter | ||
long timestamp(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to be a bit fancy:
List<SequenceAndTimestampPair> seqAndTimestampPairs = KeyedStream.stream(record.record()) | |
.map(val -> getPaxosValueData(val.learnedValue())) | |
.filter(Predicates.notNull()) | |
.mapEntries((sequence, timestamp) -> Maps.immutableEntry( | |
sequence, ImmutableSequenceAndTimestampPair.of(sequence, PtBytes.toLong(timestamp)))) | |
.values() | |
.sorted(Comparator.comparingLong(SequenceAndTimestampPair::sequence)) | |
.collect(Collectors.toList()); | |
long greatestTimestampSoFar = -1L; | |
long currentTimestamp; | |
for (SequenceAndTimestampPair sequenceAndTimestampPair : seqAndTimestampPairs) { | |
currentTimestamp = sequenceAndTimestampPair.timestamp(); | |
if (currentTimestamp <= greatestTimestampSoFar) { | |
return true; | |
} | |
greatestTimestampSoFar = currentTimestamp; | |
} | |
return false; | |
} | |
@Value.Immutable | |
interface SequenceAndTimestampPair { | |
@Value.Parameter | |
long sequence(); | |
@Value.Parameter | |
long timestamp(); | |
} | |
Stream<Long> expectedSortedTimestamps = KeyedStream.stream(record.record()) | |
.map(val -> getPaxosValueData(val.learnedValue())) | |
.filter(Predicates.notNull()) | |
.mapEntries((sequence, timestamp) -> Maps.immutableEntry(sequence, PtBytes.toLong(timestamp))) | |
.entries() | |
.sorted(Comparator.comparingLong(Entry::getKey)) | |
.map(Entry::getValue); | |
return StreamEx.of(expectedSortedTimestamps) | |
.pairMap((first, second) -> first >= second) | |
.anyMatch(x -> x); |
If you don't want to use StreamEx
, you can still avoid the immutable by checking expectedSortedTimestamps
in a for loop, but just compare neighbours instead of maintaining a global maximum
This will need redesigning. Will be moving the check out of History analyzer and adding a separate thread to verify timestamp invariants. |
...c/main/java/com/palantir/timelock/corruption/detection/LocalTimestampInvariantsVerifier.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, have some nits
.putAllViolatingStatusesToNamespaceAndUseCase( | ||
paxosRoundCorruptionReport.violatingStatusesToNamespaceAndUseCase()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.putAllViolatingStatusesToNamespaceAndUseCase( | |
paxosRoundCorruptionReport.violatingStatusesToNamespaceAndUseCase()) | |
.from(paxosRoundCorruptionReport) |
private CorruptionCheckViolation timestampInvariantsViolationLevel(NamespaceAndUseCase namespaceAndUseCase) { | ||
Stream<Long> expectedSortedTimestamps = KeyedStream.stream(getLearnerLogs(namespaceAndUseCase)) | ||
.map(PaxosValue::getData) | ||
.filter(Predicates.notNull()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.filter(Predicates.notNull()) | |
.filter(Objects::nonNull) |
@VisibleForTesting | ||
public static final int LEARNER_LOG_BATCH_SIZE_LIMIT = 250; | ||
|
||
public static final long MIN_SEQUENCE_TO_BE_VERIFIED = -1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final long MIN_SEQUENCE_TO_BE_VERIFIED = -1L; | |
public static final long MIN_SEQUENCE_TO_BE_VERIFIED = Long.MIN_VALUE; |
@Test | ||
public void detectCorruptionIfClockWentBackwardsOnNode() { | ||
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT - 1); | ||
helper.forceTimestampToGoBackwards(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method name is kinda misleading, it's actually forcing the timestamp to go forward / sequence to go backwards. It's only making the timestamp go backwards for the sequence after, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, that makes sense. Basically what I mean is - create inversion b/w this sequence and the next. I have changed the name to createTimestampInversion now.
@Test | ||
public void detectIfClockWentBackwardsAtBatchEnd() { | ||
helper.writeLogsOnDefaultLocalServer(1, LEARNER_LOG_BATCH_SIZE_LIMIT); | ||
helper.forceTimestampToGoBackwards(LEARNER_LOG_BATCH_SIZE_LIMIT / 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this then be helper.forceTimestampToGoBackwards(LEARNER_LOG_BATCH_SIZE_LIMIT - 1);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! yes, this is a copy pasting mistake.
helper.assertLocalTimestampInvariantsStand(); | ||
|
||
helper.writeLogsOnDefaultLocalServer(2, LEARNER_LOG_BATCH_SIZE_LIMIT); | ||
helper.forceTimestampToGoBackwards(LEARNER_LOG_BATCH_SIZE_LIMIT / 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
helper.forceTimestampToGoBackwards(LEARNER_LOG_BATCH_SIZE_LIMIT / 2); | |
helper.forceTimestampToGoBackwards(1); |
This way we test that we don't skip the one entry from first check. Also I guess do this before the first check to be consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -105,6 +105,31 @@ void assertViolationDetectedForNamespaceAndUseCases( | |||
return timeLockCorruptionTestSetup.createStatLogForNamespaceAndUseCase(namespaceAndUseCase); | |||
} | |||
|
|||
void assertClockWentBackwards() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to assertClockWentBackwardsInNextBatch
to be clearer? Same for the method below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Released 0.280.1 |
This reverts commit 259908a.
Goals (and why):
TimeLock should validates the expected invariant for its timestamp bounds: they must increase with increasing sequence numbers.
Implementation Description (bullets):
LocalTimestampInvariantsVerifier - verifies timestamp bounds increase with increasing sequence numbers
The validation is done batch wise, e.g. [1, n], [n, 2 * n - 1] and so on. Two consecutive batches share
boundaries, this is done to catch inversion at batch end.
The verifier uses the existing corruption handling infra i.e. invariant violation will show up in the health check and (when enabled) is capable of killing TimeLock in the case of violation.
Testing (What was existing testing like? What have you done to improve it?):
Added unit tests
Concerns (what feedback would you like?):
Does the design seem reasonable?
Can we optimize the implementation?
Did I miss any edge cases?
Where should we start reviewing?:
HistoryAnalyzer.java
Priority (whenever / two weeks / yesterday):
End of week.