-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flush old indices on primary promotion and relocation #27580
Changes from 1 commit
41d27c8
e99de20
277c87a
d77e00a
af3f815
04cbccf
eb3bd72
0a75735
4c4a1bc
a29acf2
895b78b
8782377
b77d4e8
22d5bfa
b2082b0
9935225
10fbb3e
fb8a105
c1a0cc7
e8f65f3
6c5140c
dce71ab
6989d52
c09e419
53869b3
9467758
09f7133
1501d4a
e5a734c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,8 +168,6 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
// but we must have everything above the local checkpoint in the commit | ||
requiredSeqNoRangeStart = | ||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; | ||
assert requiredSeqNoRangeStart >= 0 : | ||
"base commit contains an illegal local checkpoint " + (requiredSeqNoRangeStart - 1); | ||
try { | ||
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); | ||
} catch (final Exception e) { | ||
|
@@ -182,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
} | ||
} | ||
} | ||
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; | ||
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" | ||
+ startingSeqNo + "]"; | ||
|
||
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); | ||
|
||
|
@@ -190,7 +191,15 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
} catch (final Exception e) { | ||
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); | ||
} | ||
final long endingSeqNo = determineEndingSeqNo(); | ||
|
||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); | ||
/* | ||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all | ||
* operations in the required range will be available for replaying from the translog of the source. | ||
*/ | ||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); | ||
|
||
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); | ||
|
||
logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); | ||
final long targetLocalCheckpoint; | ||
|
@@ -227,18 +236,6 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { | |
}); | ||
} | ||
|
||
private long determineEndingSeqNo() { | ||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); | ||
/* | ||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all | ||
* operations in the required range will be available for replaying from the translog of the source. | ||
*/ | ||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); | ||
|
||
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); | ||
return endingSeqNo + 1; | ||
} | ||
|
||
/** | ||
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source | ||
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain | ||
|
@@ -525,16 +522,16 @@ static class SendSnapshotResult { | |
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. | ||
* | ||
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent | ||
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo - 1) | ||
* @param endingSeqNo the upper bound of the sequence number range to be sent (exclusive) | ||
* @param requiredSeqNoRangeStart the lower sequence number of the required range | ||
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) | ||
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total | ||
* number of operations sent | ||
* @throws IOException if an I/O exception occurred reading the translog snapshot | ||
*/ | ||
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, | ||
final Translog.Snapshot snapshot) throws IOException { | ||
assert requiredSeqNoRangeStart <= endingSeqNo : | ||
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo" + endingSeqNo; | ||
assert requiredSeqNoRangeStart <= endingSeqNo + 1: | ||
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; | ||
assert startingSeqNo <= requiredSeqNoRangeStart : | ||
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; | ||
int ops = 0; | ||
|
@@ -562,7 +559,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require | |
cancellableThreads.checkForCancel(); | ||
|
||
final long seqNo = operation.seqNo(); | ||
if (seqNo < startingSeqNo || seqNo >= endingSeqNo) { | ||
if (seqNo < startingSeqNo || seqNo > endingSeqNo) { | ||
skippedOps++; | ||
continue; | ||
} | ||
|
@@ -587,7 +584,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require | |
cancellableThreads.executeIO(sendBatch); | ||
} | ||
|
||
if (requiredOpsTracker.getCheckpoint() < endingSeqNo - 1) { | ||
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { | ||
throw new IllegalStateException("translog replay failed to covered required sequence numbers" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. covered -> cover |
||
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" | ||
+ (requiredOpsTracker.getCheckpoint() + 1) + "]"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,7 +188,7 @@ public void testSendSnapshotSendsOps() throws IOException { | |
operations.add(null); | ||
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); | ||
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); | ||
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers); | ||
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can start from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. |
||
// todo add proper tests | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this proper enough now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. :) |
||
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, | ||
endingSeqNo, new Translog.Snapshot() { | ||
|
@@ -209,7 +209,7 @@ public Translog.Operation next() throws IOException { | |
return operations.get(counter++); | ||
} | ||
}); | ||
final int expectedOps = (int) (endingSeqNo - startingSeqNo); | ||
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); | ||
assertThat(result.totalOperations, equalTo(expectedOps)); | ||
final ArgumentCaptor<List> shippedOpsCaptor = ArgumentCaptor.forClass(List.class); | ||
if (expectedOps > 0) { | ||
|
@@ -227,7 +227,7 @@ public Translog.Operation next() throws IOException { | |
if (endingSeqNo >= requiredStartingSeqNo + 1) { | ||
// check that missing ops blows up | ||
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker | ||
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() < endingSeqNo).collect(Collectors.toList()); | ||
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); | ||
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); | ||
expectThrows(IllegalStateException.class, () -> | ||
handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we may need to reformat this javadocs