-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flush old indices on primary promotion and relocation #27580
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall change LGTM, I've made some suggestions around code structure though.
// but we must have everything above the local checkpoint in the commit | ||
requiredSeqNoRangeStart = | ||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; | ||
assert requiredSeqNoRangeStart >= 0 : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would instead add the following two assertions after the if (isSequenceNumberBasedRecoveryPossible) { ... } else { ... }
as that's what we actually want to hold for both branches:
assert startingSeqNo >= 0;
assert requiredSeqNoRangeStart >= start;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -221,28 +227,33 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { | |||
}); | |||
} | |||
|
|||
private long determineEndingSeqNo() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the name of this method. I would prefer to not have a separate method, and just have:
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
and then use endingSeqNo
as an inclusive bound instead of an exclusive one in the remaining calculations.
@@ -434,22 +445,25 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { | |||
* | |||
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all | |||
* ops should be sent | |||
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) | |||
* @param endingSeqNo the highest sequence number that should be sent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here it's defined as inclusive, below in the Javadocs of finalizeRecovery, it's defined as exclusive...
Let's use the inclusive version.
int ops = 0; | ||
long size = 0; | ||
int skippedOps = 0; | ||
int totalSentOps = 0; | ||
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
final List<Translog.Operation> operations = new ArrayList<>(); | ||
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If endingSeqNo
is exclusive, then this should probably be endingSeqNo - 1
. I know that it does not really matter as we only use the markSeqNoAsCompleted
method, we might as well initialize this to
new LocalCheckpointTracker(requiredSeqNoRangeStart - 1, requiredSeqNoRangeStart - 1)
, but yeah, let's use inclusive bounds for endingSeqNo
;-)
@@ -567,6 +587,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl | |||
cancellableThreads.executeIO(sendBatch); | |||
} | |||
|
|||
if (requiredOpsTracker.getCheckpoint() < endingSeqNo - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again having to use -1 here, easier to have endingSeqNo be inclusive.
@@ -7,6 +7,7 @@ | |||
# wait for long enough that we give delayed unassigned shards to stop being delayed | |||
timeout: 70s | |||
level: shards | |||
index: test_index, index_with_replicas, multi_type_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to wait here? Why not directly in the corresponding tests? This makes for example the RecoveryIT test dependent on a specific yml file, which is ugly IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misread, ignore this comment.
thx @ywelsch . I addressed your feedback. can you take another look? |
@ywelsch I run into another edge case. Can you please take another look at the last few commits? |
* Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case. | ||
*/ | ||
public void testEmptyShard() throws IOException { | ||
final String index = "test_empty_hard"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hard -> shard
… info once recovery is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @bleskes
During a recovery the target shard may process both new indexing operations and old ones concurrently. When the primary is on a 6.0 node, the new indexing operations are guaranteed to have sequence numbers but we don't have that guarantee for the old operations as they may come from a period when the primary was on a pre 6.0 node. Have this mixture of old and new is something we do not support and it triggers exceptions. This PR adds a flush on primary promotion and primary relocations to make sure that any recoveries from a primary on a 6.0 will be guaranteed to only need operations with sequence numbers. A recovery from store already flushes when we start the engine if there were any ops in the translog. With this extra flushes in place we can now actively filter out operations that have no sequence numbers during recovery. Since filtering out operations is risky, I have opted to harden the logic in the recovery source handler to verify that all operations in the required sequence number range (from the local checkpoint in the commit onwards) are not missed. This comes at an extra complexity for this PR but I think it's worth it. Finally I added two tests that reproduce the problems. Closes #27536
During a recovery the target shard may process both new indexing operations and old ones concurrently. When the primary is on a 6.0 node, the new indexing operations are guaranteed to have sequence numbers but we don't have that guarantee for the old operations as they may come from a period when the primary was on a pre 6.0 node. Have this mixture of old and new is something we do not support and it triggers exceptions. This PR adds a flush on primary promotion and primary relocations to make sure that any recoveries from a primary on a 6.0 will be guaranteed to only need operations with sequence numbers. A recovery from store already flushes when we start the engine if there were any ops in the translog. With this extra flushes in place we can now actively filter out operations that have no sequence numbers during recovery. Since filtering out operations is risky, I have opted to harden the logic in the recovery source handler to verify that all operations in the required sequence number range (from the local checkpoint in the commit onwards) are not missed. This comes at an extra complexity for this PR but I think it's worth it. Finally I added two tests that reproduce the problems. Closes #27536s
Before we use to ship anything in the translog above a certain point. #27580 changed to have a strict upper bound.
Before we use to ship anything in the translog above a certain point. #27580 changed to have a strict upper bound.
Before we use to ship anything in the translog above a certain point. #27580 changed to have a strict upper bound.
Before we use to ship anything in the translog above a certain point. #27580 changed to have a strict upper bound.
…rget from an old node #27580 added extra flushes when a shard transitions to primary to make sure that we never replay translog ops without seq# during recovery. The current logic causes an extra flush when a primary starts when it's recovering from the store. This is not needed as we also flush in the engine itself (to add sequence numbers info into the commit). This double flushing confuses tests and is unneeded. Fixes #27649
…rget from an old node #27580 added extra flushes when a shard transitions to primary to make sure that we never replay translog ops without seq# during recovery. The current logic causes an extra flush when a primary starts when it's recovering from the store. This is not needed as we also flush in the engine itself (to add sequence numbers info into the commit). This double flushing confuses tests and is unneeded. Fixes #27649
…rget from an old node #27580 added extra flushes when a shard transitions to primary to make sure that we never replay translog ops without seq# during recovery. The current logic causes an extra flush when a primary starts when it's recovering from the store. This is not needed as we also flush in the engine itself (to add sequence numbers info into the commit). This double flushing confuses tests and is unneeded. Fixes #27649 # Conflicts: # core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
During a recovery the target shard may process both new indexing operations and old ones concurrently. When the primary is on a 6.0 node, the new indexing operations are guaranteed to have sequence numbers but we don't have that guarantee for the old operations as they may come from a period when the primary was on a pre 6.0 node. Have this mixture of old and new is something we do not support and it triggers exceptions.
This PR adds a flush on primary promotion and primary relocations to make sure that any recoveries from a primary on a 6.0 will be guaranteed to only need operations with sequence numbers. A recovery from store already flushes when we start the engine if there were any ops in the translog.
With this extra flushes in place we can now actively filter out operations that have no sequence numbers during recovery. Since filtering out operations is risky, I have opted to harden the logic in the recovery source handler to verify that all operations in the required sequence number range (from the local checkpoint in the commit onwards) are not missed. This comes at an extra complexity for this PR but I think it's worth it.
Finally I added two tests that reproduce the problems.
Closes #27536
PS. I still need to add unit tests but since there's some time pressure on this one I think we can start reviewing.