-
Notifications
You must be signed in to change notification settings - Fork 15
[TimeLock Corruption] | Implement progress tracker #5078
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by of a few things; didn't review in-depth semantics as I don't have full context on what's going on here.
import java.util.concurrent.ConcurrentHashMap; | ||
import javax.sql.DataSource; | ||
|
||
public class PaxosLogHistoryProgressTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ is telling me many things in this class don't have to be public
- can you make those package-private or private?
|
||
public void updateProgressState(Map<NamespaceAndUseCase, SequenceBounds> namespaceAndUseCaseSequenceBoundsMap) { | ||
namespaceAndUseCaseSequenceBoundsMap.forEach((namespaceAndUseCase, bounds) -> | ||
updateProgressStateForNamespaceAndUseCase(namespaceAndUseCase, bounds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with method reference
private final LogVerificationProgressState logVerificationProgressState; | ||
private final SqlitePaxosStateLogHistory sqlitePaxosStateLogHistory; | ||
|
||
private Map<NamespaceAndUseCase, ProgressComponents> verificationProgressStateCache = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, you only ever put or get from this cache (i.e. never delete). I don't have full context on this piece of work, but I wonder if it is possible to OOM yourself? How many entries do you expect to be present in this cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size would be bounded by |NamespaceAndUseCase|
which would generally be around 1000, so I think this is fine. (Good flag though!)
import org.immutables.value.Value; | ||
|
||
@Value.Immutable | ||
public interface ProgressComponents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these immutables are fine, but if you really want to avoid crossed wires, you can try a staged builder: https://immutables.github.io/immutable.html#staged-builder
(I appreciate not everyone likes them, but I do!)
Map<NamespaceAndUseCase, Long> lastVerifiedSequences = historyQueries.stream() | ||
.collect(Collectors.toMap(HistoryQuery::getNamespaceAndUseCase, HistoryQuery::getSeq, Math::min)); | ||
Map<NamespaceAndUseCase, SequenceBounds> lastVerifiedSequences = historyQueries.stream() | ||
.collect(Collectors.toMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan - KeyedStream.of(historyQueries).mapKeys(...).map(...)
gets you most of the way, although I'll confess that I'm not sure how to do the merging resolution in KeyedStream
(although you can always collect to multimap).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we have now is more readable
List<LogsForNamespaceAndUseCase> paxosHistory = | ||
HistoryLoaderAndTransformer.getLogsForHistoryQueries(history, historyQueries); | ||
|
||
sanityCheckLoadedHistory(paxosHistory, 100 - lastVerified); | ||
sanityCheckLoadedHistory(paxosHistory, 100 - lastVerified + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking of sanity checks, is this:
(100 - lastVerified) + 1
OR
100 - (lastVerified + 1)
?
I assume the latter, although I'm never certain on how it determines this - so maybe bracket it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The structure makes sense, but I have some concerns around readability. Specifically, you use lastVerifiedSequence
throughout most of the code, but associate it with an exclusive upper bound in SequenceBounds
which means that the "last verified sequence" isn't actually verified. I'd suggest using inclusive bounds throughout.
I think this also leads to a strange edge case in PaxosLogHistoryProgressTracker
: as written I think the tracker spins infinitely on an inactive series, because you're comparing an inclusive bound with an exclusive bound.
long seq(); | ||
|
||
@Value.Parameter | ||
long progressLimit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we have docs on what this means?
@@ -47,6 +52,17 @@ private HistoryLoaderAndTransformer() { | |||
.collect(Collectors.toList()); | |||
} | |||
|
|||
private static SequenceBounds seqBoundsCollisionResolver(SequenceBounds bound1, SequenceBounds bound2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's give this a more descriptive name? minimalLowerBoundResolver
or something like that?
@@ -87,13 +88,13 @@ public void canHandleDuplicateQueries() { | |||
|
|||
List<HistoryQuery> queries = IntStream.range(0, 10) | |||
.boxed() | |||
.map(idx -> HistoryQuery.of(NAMESPACE_AND_USE_CASE, minLastVerified + idx)) | |||
.map(idx -> HistoryQuery.of(NAMESPACE_AND_USE_CASE, minLastVerified + idx, minLastVerified + 500)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reference this 500 constant somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, see later comment: would prefer this to have a factory for generating unbounded ends
List<HistoryQuery> historyQueries = | ||
ImmutableList.of(HistoryQuery.of(ImmutableNamespaceAndUseCase.of(CLIENT, USE_CASE), lastVerified)); | ||
List<HistoryQuery> historyQueries = ImmutableList.of( | ||
HistoryQuery.of(ImmutableNamespaceAndUseCase.of(CLIENT, USE_CASE), lastVerified, lastVerified + 500)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just have an unbounded end factory in HistoryQuery, it looks like this is causing a lot of friction
long lower(); | ||
|
||
@Value.Parameter | ||
long upper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend being explicit about the edges of the range here: lowerInclusive
but upperExclusive
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I would strongly prefer for us to use inclusive ranges. A lot of the lastVerified
stuff is misleading because upper()
is not actually verified.
import org.immutables.value.Value; | ||
|
||
@Value.Immutable | ||
public interface ProgressComponents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd name this differently, maybe ProgressState
or something like that?
@Value.Immutable | ||
public interface ProgressComponents { | ||
@Value.Parameter | ||
long seq(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastVerifiedSeq
? This makes it sound similar to the Paxos constructs when it's really not
Map<Long, PaxosAcceptorData> getAcceptorLogsSince( | ||
@BindPojo("namespace") Client namespace, @Bind("useCase") String useCase, @Bind("seq") long seq); | ||
@SqlQuery("SELECT seq, val FROM paxosLog WHERE namespace = :namespace.value AND useCase = :useCase AND seq >=" | ||
+ " :lowerBound AND seq < :upperBound") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See discussion elsewhere: I think I'd have a strong preference for having inclusive ranges here, because we've used terminology like latestVerified
through many other parts of the code, which are not strictly correct when the upper bound has actually not been verified.
|
||
@VisibleForTesting | ||
void updateProgressStateForNamespaceAndUseCase(NamespaceAndUseCase key, SequenceBounds value) { | ||
long lastVerifiedSequence = value.upper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an exclusive bound, so it is not the last verified sequence (see comment on my thoughts on SequenceBound).
NamespaceAndUseCase key, SequenceBounds value, ProgressComponents currentProgressState) { | ||
if (value.upper() <= currentProgressState.progressLimit()) { | ||
return Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't behave the way I think is intended: your seeding of the bound in getLatestSequence
is inclusive, however your use of value
here is exclusive. This means that even if there are no new entries, the validation task will repeatedly cycle over the entries (e.g. of an inactive client), which I doubt is intended.
I'd suggest having as integration tests (maybe not in this PR, but before the workstream is complete):
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I'll add some ETE tests in a separate PR.
Map<NamespaceAndUseCase, Long> lastVerifiedSequences = historyQueries.stream() | ||
.collect(Collectors.toMap(HistoryQuery::getNamespaceAndUseCase, HistoryQuery::getSeq, Math::min)); | ||
Map<NamespaceAndUseCase, SequenceBounds> lastVerifiedSequences = historyQueries.stream() | ||
.collect(Collectors.toMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we have now is more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🕐 1:00
Thanks for adding more comprehensive tests and changing the interval queries to operate on closed intervals; I think the PR looks correct now.
I'm afraid I think there are still some issues, largely around readability:
- I'm uncertain of the decomp: I don't understand the point of
SequenceBounds
whenHistoryQuery
already exists. I'd kill it if possible, and if not, given that you have a constant in the class and a factory that strongly suggest it is intended only for history queries, I would suggest naming it accordingly. - Some variable names, in particular around the form of
Map<T, SequenceBounds> lastVerifiedSequences
need to be changed - in particular, they often refer to the bounds on queries that need to be made (while previously there was only a lower bound, solastVerifiedSequences
was correct). - I would urge you to use method names indicating the intention behind your methods as opposed to just what the methods do. For example,
getBoundsSinceLastVerified(long lastVerified)
doesn't say much, and I'd argue the most natural interpretation of that is[lastVerified + 1, inf)
- while if you describe it asgetBoundsForNextHistoryQuery(long lastVerified)
a reader will understand what it's for, and is more likely to have assumptions in line with what you're intending.
@SqlUpdate("CREATE TABLE IF NOT EXISTS logVerificationProgress (namespace TEXT, useCase TEXT, seq BIGINT," | ||
+ "PRIMARY KEY(namespace, useCase))") | ||
@SqlUpdate("CREATE TABLE IF NOT EXISTS logVerificationProgress (namespace TEXT, useCase TEXT, lastVerifiedSeq" | ||
+ " BIGINT, greatestSeqNumberToBeVerified BIGINT, PRIMARY KEY(namespace, useCase))") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sanity check: this has never been wired up right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break since #5071 merged, my bad, I will fix this.
LearnerUseCase learnerUseCase, | ||
AcceptorUseCase acceptorUseCase, | ||
long lowerBound, | ||
long upperBound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest maintaining lowerBoundInclusive
and upperBoundInclusive
through the parameters in the call stack when making these queries
// TODO(snanda): Refactor the two parts on translating PaxosHistoryOnRemote to | ||
// CompletePaxosHistoryForNamespaceAndUseCase to a separate component | ||
public List<CompletePaxosHistoryForNamespaceAndUseCase> getHistory() { | ||
Map<NamespaceAndUseCase, Long> lastVerifiedSequences = getNamespaceAndUseCaseToLastVerifiedSeqMap(); | ||
Map<NamespaceAndUseCase, SequenceBounds> lastVerifiedSequences = getNamespaceAndUseCaseToLastVerifiedSeqMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable name is misleading. These bounds are not verified at the time this call is made - they are the candidates you want to verify. In the previous model it was correct (because that's where you start the query from).
@@ -39,19 +40,21 @@ public static LocalHistoryLoader create(SqlitePaxosStateLogHistory sqlitePaxosSt | |||
return new LocalHistoryLoader(sqlitePaxosStateLogHistory); | |||
} | |||
|
|||
public PaxosHistoryOnSingleNode getLocalPaxosHistory(Map<NamespaceAndUseCase, Long> lastVerifiedSequences) { | |||
public PaxosHistoryOnSingleNode getLocalPaxosHistory( | |||
Map<NamespaceAndUseCase, SequenceBounds> lastVerifiedSequences) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is no longer correct.
return KeyedStream.stream(lastVerifiedSequences) | ||
.mapEntries(this::buildHistoryQuery) | ||
.values() | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private Map<NamespaceAndUseCase, Long> getNamespaceAndUseCaseToLastVerifiedSeqMap() { | ||
private Map<NamespaceAndUseCase, SequenceBounds> getNamespaceAndUseCaseToLastVerifiedSeqMap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The name of this method is no longer correct though. As a user who's not familiar, how do I know what the elements of SequenceBounds
are?
return ImmutableSequenceBounds.builder(); | ||
} | ||
|
||
static SequenceBounds getBoundsSinceLastVerified(long lastVerified) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
I might be missing something, but I'm starting to be convinced that this class has no reason to exist: we should just operate on HistoryQuery
objects.
this.sqlitePaxosStateLogHistory = sqlitePaxosStateLogHistory; | ||
} | ||
|
||
public SequenceBounds getPaxosLogSequenceBounds(NamespaceAndUseCase namespaceAndUseCase) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this method name mean? (It gets the bounds for the next history query; is there a reason the progress tracker shouldn't be creating history queries in and of itself?)
|
||
@Value.Immutable | ||
public interface SequenceBounds { | ||
int MAX_ROWS_ALLOWED = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels kind of dubious that this is here; it seems this is just a history query without a namespace and use case. If possible I'd urge you to try and kill this class altogether, but if that's not possible I would strongly prefer naming this class so that it's obvious that it is specific to history queries (e.g. HistoryQuerySequenceRange
).
import com.palantir.paxos.NamespaceAndUseCase; | ||
import com.palantir.timelock.history.HistoryQuery; | ||
|
||
public class HistoryQueryUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the standard form for factory classes of this kind is the plural of the class name i.e. HistoryQueries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Checkpointing of multiple values can be messy, but discussed offline - we think it's not unreasonable to remove it and use in memory state for remembering when we should reset.
There is a degenerate condition where you might never verify old entries if the server is restarted extremely frequently, so that there is always one paxos round in between the verification task reading the bound and processing a batch, but I don't think we need to worry about that.
Generate changelog in
|
058038b
to
b4b291a
Compare
Goals (and why):
Adds a ProgressTracker to keep track of last verified sequence numbers + update last verified one history till that point is loaded from all servers.
Implementation Description (bullets):
Add a ProgressTracker to maintain local cache, get lastVerified seq numbers and update lastVerified
Maintain local cache of (lastVerifiedSeq, greatestSeqNumberToBeVerified) for each (namespace, useCase) pair
Init state => lastVerifiedSeq = -1, greatestSeqNumberToBeVerified = latest learned value for respective (namespace, useCase) pair
Each iteration will do two things -
get seq bounds [lowerInclusive, upperInclusive] to load logs for corruption verification. If the lastVerifiedSeq > greatestSeqNumberToBeVerified, reset to init state
update the lastVerified with upperInclusive after logs have been loaded
Testing (What was existing testing like? What have you done to improve it?):
Added tests
Concerns (what feedback would you like?):
Did I miss any edge cases in the ProgressTracker?
No batching while making DB calls
No mechanism for detecting inactive series. In the worst case, inactive series can get their progressState refreshed on each iteration.
Where should we start reviewing?:
PaxosLogHistoryProgressTracker.java
Priority (whenever / two weeks / yesterday):
ASAP 🏃♀️