-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flush old indices on primary promotion and relocation #27580
Changes from 27 commits
41d27c8
e99de20
277c87a
d77e00a
af3f815
04cbccf
eb3bd72
0a75735
4c4a1bc
a29acf2
895b78b
8782377
b77d4e8
22d5bfa
b2082b0
9935225
10fbb3e
fb8a105
c1a0cc7
e8f65f3
6c5140c
dce71ab
6989d52
c09e419
53869b3
9467758
09f7133
1501d4a
e5a734c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,23 +148,26 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
final Translog translog = shard.getTranslog(); | ||
|
||
final long startingSeqNo; | ||
final long requiredSeqNoRangeStart; | ||
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && | ||
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); | ||
|
||
if (isSequenceNumberBasedRecoveryPossible) { | ||
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); | ||
startingSeqNo = request.startingSeqNo(); | ||
requiredSeqNoRangeStart = startingSeqNo; | ||
} else { | ||
final Engine.IndexCommitRef phase1Snapshot; | ||
try { | ||
phase1Snapshot = shard.acquireIndexCommit(false); | ||
} catch (final Exception e) { | ||
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); | ||
} | ||
// we set this to unassigned to create a translog roughly according to the retention policy | ||
// on the target | ||
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
|
||
// we set this to 0 to create a translog roughly according to the retention policy | ||
// on the target. Note that it will still filter out legacy operations with no sequence numbers | ||
startingSeqNo = 0; | ||
// but we must have everything above the local checkpoint in the commit | ||
requiredSeqNoRangeStart = | ||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; | ||
try { | ||
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); | ||
} catch (final Exception e) { | ||
|
@@ -177,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
} | ||
} | ||
} | ||
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; | ||
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" | ||
+ startingSeqNo + "]"; | ||
|
||
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); | ||
|
||
|
@@ -186,10 +192,19 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); | ||
} | ||
|
||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); | ||
/* | ||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all | ||
* operations in the required range will be available for replaying from the translog of the source. | ||
*/ | ||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); | ||
|
||
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); | ||
|
||
logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); | ||
final long targetLocalCheckpoint; | ||
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { | ||
targetLocalCheckpoint = phase2(startingSeqNo, snapshot); | ||
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); | ||
} catch (Exception e) { | ||
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); | ||
} | ||
|
@@ -223,26 +238,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { | |
|
||
/** | ||
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source | ||
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. | ||
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain | ||
* all ops above the source local checkpoint, so we can stop check there. | ||
* | ||
* @return {@code true} if the source is ready for a sequence-number-based recovery | ||
* @throws IOException if an I/O exception occurred reading the translog snapshot | ||
*/ | ||
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { | ||
final long startingSeqNo = request.startingSeqNo(); | ||
assert startingSeqNo >= 0; | ||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); | ||
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo); | ||
final long localCheckpoint = shard.getLocalCheckpoint(); | ||
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); | ||
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one | ||
if (startingSeqNo - 1 <= endingSeqNo) { | ||
/* | ||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all | ||
* operations in the required range will be available for replaying from the translog of the source. | ||
*/ | ||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); | ||
|
||
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); | ||
|
||
if (startingSeqNo - 1 <= localCheckpoint) { | ||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); | ||
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { | ||
Translog.Operation operation; | ||
|
@@ -252,7 +260,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { | |
} | ||
} | ||
} | ||
return tracker.getCheckpoint() >= endingSeqNo; | ||
return tracker.getCheckpoint() >= localCheckpoint; | ||
} else { | ||
return false; | ||
} | ||
|
@@ -432,24 +440,27 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { | |
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new | ||
* shard. | ||
* | ||
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all | ||
* ops should be sent | ||
* @param snapshot a snapshot of the translog | ||
* | ||
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all | ||
* ops should be sent | ||
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) | ||
* @param endingSeqNo the highest sequence number that should be sent | ||
* @param snapshot a snapshot of the translog | ||
* @return the local checkpoint on the target | ||
*/ | ||
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { | ||
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) | ||
throws IOException { | ||
if (shard.state() == IndexShardState.CLOSED) { | ||
throw new IndexShardClosedException(request.shardId()); | ||
} | ||
cancellableThreads.checkForCancel(); | ||
|
||
final StopWatch stopWatch = new StopWatch().start(); | ||
|
||
logger.trace("recovery [phase2]: sending transaction log operations"); | ||
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + | ||
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); | ||
|
||
// send all the snapshot's translog operations to the target | ||
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot); | ||
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); | ||
|
||
stopWatch.stop(); | ||
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); | ||
|
@@ -510,18 +521,26 @@ static class SendSnapshotResult { | |
* <p> | ||
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. | ||
* | ||
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent | ||
* @param snapshot the translog snapshot to replay operations from | ||
* @return the local checkpoint on the target and the total number of operations sent | ||
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent | ||
* @param requiredSeqNoRangeStart the lower sequence number of the required range | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we may need to reformat this javadocs |
||
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) | ||
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the | ||
* total number of operations sent | ||
* @throws IOException if an I/O exception occurred reading the translog snapshot | ||
*/ | ||
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { | ||
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, | ||
final Translog.Snapshot snapshot) throws IOException { | ||
assert requiredSeqNoRangeStart <= endingSeqNo + 1: | ||
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; | ||
assert startingSeqNo <= requiredSeqNoRangeStart : | ||
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; | ||
int ops = 0; | ||
long size = 0; | ||
int skippedOps = 0; | ||
int totalSentOps = 0; | ||
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
final List<Translog.Operation> operations = new ArrayList<>(); | ||
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
|
||
final int expectedTotalOps = snapshot.totalOperations(); | ||
if (expectedTotalOps == 0) { | ||
|
@@ -538,19 +557,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl | |
throw new IndexShardClosedException(request.shardId()); | ||
} | ||
cancellableThreads.checkForCancel(); | ||
/* | ||
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and | ||
* any ops before the starting sequence number. | ||
*/ | ||
|
||
final long seqNo = operation.seqNo(); | ||
if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { | ||
if (seqNo < startingSeqNo || seqNo > endingSeqNo) { | ||
skippedOps++; | ||
continue; | ||
} | ||
operations.add(operation); | ||
ops++; | ||
size += operation.estimateSize(); | ||
totalSentOps++; | ||
requiredOpsTracker.markSeqNoAsCompleted(seqNo); | ||
|
||
// check if this request is past bytes threshold, and if so, send it off | ||
if (size >= chunkSizeInBytes) { | ||
|
@@ -567,6 +584,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl | |
cancellableThreads.executeIO(sendBatch); | ||
} | ||
|
||
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { | ||
throw new IllegalStateException("translog replay failed to cover required sequence numbers" + | ||
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" | ||
+ (requiredOpsTracker.getCheckpoint() + 1) + "]"); | ||
} | ||
|
||
assert expectedTotalOps == skippedOps + totalSentOps | ||
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we may need to reformat this javadocs