Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush old indices on primary promotion and relocation #27580

Merged
merged 29 commits into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
41d27c8
add testRecoveryWithConcurrentIndexing
bleskes Nov 28, 2017
e99de20
fix for promotion
bleskes Nov 28, 2017
277c87a
relax. You can't guarantee what you want
bleskes Nov 28, 2017
d77e00a
assert we ship what we want to ship
bleskes Nov 28, 2017
af3f815
verify we ship the right ops
bleskes Nov 28, 2017
04cbccf
logging
bleskes Nov 28, 2017
eb3bd72
doh
bleskes Nov 28, 2017
0a75735
lint
bleskes Nov 28, 2017
4c4a1bc
more intuitive range indication
bleskes Nov 28, 2017
a29acf2
fix testSendSnapshotSendsOps
bleskes Nov 28, 2017
895b78b
add primary relocation test
bleskes Nov 29, 2017
8782377
index specific ensure green
bleskes Nov 29, 2017
b77d4e8
fix counts
bleskes Nov 29, 2017
22d5bfa
tighten testRelocationWithConcurrentIndexing
bleskes Nov 29, 2017
b2082b0
flush on relocation
bleskes Nov 29, 2017
9935225
simplify relation ship between flush and roll
bleskes Nov 29, 2017
10fbb3e
add explicit index names to health check
bleskes Nov 29, 2017
fb8a105
beef up testSendSnapshotSendsOps
bleskes Nov 29, 2017
c1a0cc7
fix testWaitForPendingSeqNo
bleskes Nov 29, 2017
e8f65f3
feedback
bleskes Nov 29, 2017
6c5140c
more feedback
bleskes Nov 29, 2017
dce71ab
last feedback round
bleskes Nov 29, 2017
6989d52
reduce the ensure green
bleskes Nov 29, 2017
c09e419
fix testSendSnapshotSendsOps as we always send at least one (potentia…
bleskes Nov 29, 2017
53869b3
extra space?
bleskes Nov 29, 2017
9467758
add empty shard test
bleskes Nov 30, 2017
09f7133
make sure seq no info is in commit if recovering an old index
bleskes Nov 30, 2017
1501d4a
add assertions that commit point in store always has sequence numbers…
bleskes Nov 30, 2017
e5a734c
hard -> shard
bleskes Nov 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,13 @@ public void updateShardState(final ShardRouting newRouting,
final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode();
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
final Engine engine = getEngine();
engine.seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
// Flush the translog as it may contain operations with no sequence numbers. We want to make sure those
// operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq#
// (due to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
engine.flush(false, true);
}
}

Expand Down Expand Up @@ -487,15 +493,26 @@ public void updateShardState(final ShardRouting newRouting,
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed. Rolling the translog generation is
* not strictly needed here (as we will never have collisions between sequence numbers in a translog
* generation in a new primary as it takes the last known sequence number as a starting point), but it
* simplifies reasoning about the relationship between primary terms and translog generations.
* replaying the translog and marking any operations there are completed.
*/
getEngine().rollTranslogGeneration();
getEngine().restoreLocalCheckpointFromTranslog();
getEngine().fillSeqNoGaps(newPrimaryTerm);
getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1)) {
// an index that was created before sequence numbers were introduce may contain operations in its
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: introduce -> introduced

// translog that do not have a sequence numbers. We want to make sure those operations will never
// be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due
// to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
engine.flush(false, true);
}
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
* translog generations.
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
getEngine().seqNoService().getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,26 @@ public RecoveryResponse recoverToTarget() throws IOException {
final Translog translog = shard.getTranslog();

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();

if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// we set this to unassigned to create a translog roughly according to the retention policy
// on the target
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;

// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0;
// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);
} catch (final Exception e) {
Expand All @@ -177,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

Expand All @@ -186,10 +192,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);

logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, snapshot);
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
Expand Down Expand Up @@ -223,26 +238,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {

/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
* all ops above the source local checkpoint, so we can stop check there.
*
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo);
final long localCheckpoint = shard.getLocalCheckpoint();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= endingSeqNo) {
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);

if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
Expand All @@ -252,7 +260,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
}
}
}
return tracker.getCheckpoint() >= endingSeqNo;
return tracker.getCheckpoint() >= localCheckpoint;
} else {
return false;
}
Expand Down Expand Up @@ -434,22 +442,25 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may need to reformat this javadocs

* @param endingSeqNo the highest sequence number that should be sent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it's defined as inclusive, below in the Javadocs of finalizeRecovery, it's defined as exclusive...
Let's use the inclusive version.

* @param snapshot a snapshot of the translog
*
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();

final StopWatch stopWatch = new StopWatch().start();

logger.trace("recovery [phase2]: sending transaction log operations");
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);

stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
Expand Down Expand Up @@ -511,17 +522,25 @@ static class SendSnapshotResult {
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the local checkpoint on the target and the total number of operations sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may need to reformat this javadocs

* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total
* number of operations sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If endingSeqNo is exclusive, then this should probably be endingSeqNo - 1. I know that it does not really matter as we only use the markSeqNoAsCompleted method, we might as well initialize this to
new LocalCheckpointTracker(requiredSeqNoRangeStart - 1, requiredSeqNoRangeStart - 1), but yeah, let's use inclusive bounds for endingSeqNo ;-)


final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
Expand All @@ -538,19 +557,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
/*
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
* any ops before the starting sequence number.
*/

final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps++;
continue;
}
operations.add(operation);
ops++;
size += operation.estimateSize();
totalSentOps++;
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
Expand All @@ -567,6 +584,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
cancellableThreads.executeIO(sendBatch);
}

if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to covered required sequence numbers" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

covered -> cover

" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}

assert expectedTotalOps == skippedOps + totalSentOps
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,15 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());

CountDownLatch recoveryStart = new CountDownLatch(1);
AtomicBoolean preparedForTranslog = new AtomicBoolean(false);
AtomicBoolean opsSent = new AtomicBoolean(false);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
preparedForTranslog.set(true);
super.prepareForTranslogOperations(totalTranslogOps);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
opsSent.set(true);
return super.indexTranslogOperations(operations, totalTranslogOps);
}
};
});
Expand All @@ -392,7 +392,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio
// index some more
docs += shards.indexDocs(randomInt(5));

assertFalse("recovery should wait on pending docs", preparedForTranslog.get());
assertFalse("recovery should wait on pending docs", opsSent.get());

primaryEngineFactory.releaseLatchedIndexers();
pendingDocsDone.await();
Expand Down
Loading