Skip to content

Commit

Permalink
Flush old indices on primary promotion and relocation (#27580)
Browse files Browse the repository at this point in the history
During a recovery the target shard may process both new indexing operations and old ones concurrently. When the primary is on a 6.0 node, the new indexing operations are guaranteed to have sequence numbers but we don't have that guarantee for the old operations as they may come from a period when the primary was on a pre 6.0 node. Have this mixture of old and new is something we do not support and it triggers exceptions.

This PR adds a flush on primary promotion and primary relocations to make sure that any recoveries from a primary on a 6.0 will be guaranteed to only need operations with sequence numbers. A recovery from store already flushes when we start the engine if there were any ops in the translog.

With this extra flushes in place we can now actively filter out operations that have no sequence numbers during recovery. Since filtering out operations is risky, I have opted to harden the logic in the recovery source handler to verify that all operations in the required sequence number range (from the local checkpoint in the commit onwards) are not missed. This comes at an extra complexity for this PR but I think it's worth it.

Finally I added two tests that reproduce the problems.

Closes #27536s
  • Loading branch information
bleskes committed Nov 30, 2017
1 parent 1eb5cb1 commit 8732156
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ private void recoverFromTranslogInternal() throws IOException {
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
} else if (lastCommittedSegmentInfos.getUserData().containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) == false) {
assert engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
"index was created on version " + engineConfig.getIndexSettings().getIndexVersionCreated() + "but has "
+ "no sequence numbers info in commit";

commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
Expand Down
46 changes: 37 additions & 9 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,13 @@ public void updateShardState(final ShardRouting newRouting,
final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode();
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
final Engine engine = getEngine();
engine.seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
// Flush the translog as it may contain operations with no sequence numbers. We want to make sure those
// operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq#
// (due to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
engine.flush(false, true);
}
}

Expand Down Expand Up @@ -475,15 +481,26 @@ public void updateShardState(final ShardRouting newRouting,
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed. Rolling the translog generation is
* not strictly needed here (as we will never have collisions between sequence numbers in a translog
* generation in a new primary as it takes the last known sequence number as a starting point), but it
* simplifies reasoning about the relationship between primary terms and translog generations.
* replaying the translog and marking any operations there are completed.
*/
getEngine().rollTranslogGeneration();
getEngine().restoreLocalCheckpointFromTranslog();
getEngine().fillSeqNoGaps(newPrimaryTerm);
getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1)) {
// an index that was created before sequence numbers were introduced may contain operations in its
// translog that do not have a sequence numbers. We want to make sure those operations will never
// be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due
// to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
engine.flush(false, true);
}
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
* translog generations.
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
getEngine().seqNoService().getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
Expand Down Expand Up @@ -1323,6 +1340,17 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]";
return true;
}

private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,26 @@ public RecoveryResponse recoverToTarget() throws IOException {
final Translog translog = shard.getTranslog();

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();

if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// we set this to unassigned to create a translog roughly according to the retention policy
// on the target
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;

// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0;
// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);
} catch (final Exception e) {
Expand All @@ -178,6 +181,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

Expand All @@ -187,10 +193,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);

logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, snapshot);
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
Expand Down Expand Up @@ -224,26 +239,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {

/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
* all ops above the source local checkpoint, so we can stop check there.
*
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo);
final long localCheckpoint = shard.getLocalCheckpoint();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= endingSeqNo) {
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);

if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
Expand All @@ -253,7 +261,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
}
}
}
return tracker.getCheckpoint() >= endingSeqNo;
return tracker.getCheckpoint() >= localCheckpoint;
} else {
return false;
}
Expand Down Expand Up @@ -433,24 +441,27 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
* shard.
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param snapshot a snapshot of the translog
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();

final StopWatch stopWatch = new StopWatch().start();

logger.trace("recovery [phase2]: sending transaction log operations");
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);

stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
Expand Down Expand Up @@ -511,18 +522,26 @@ static class SendSnapshotResult {
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the local checkpoint on the target and the total number of operations sent
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);

final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
Expand All @@ -539,19 +558,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
/*
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
* any ops before the starting sequence number.
*/

final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps++;
continue;
}
operations.add(operation);
ops++;
size += operation.estimateSize();
totalSentOps++;
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
Expand All @@ -569,8 +586,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
}

assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);

if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}

logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);

Expand Down
Loading

0 comments on commit 8732156

Please sign in to comment.