Skip to content

Commit

Permalink
elastic#27580 the good parts (non bwc)
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Nov 30, 2017
1 parent 1f89e9d commit f046603
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,6 @@ private void recoverFromTranslogInternal() throws IOException {
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();
Expand Down
40 changes: 22 additions & 18 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
Expand All @@ -66,7 +65,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -411,14 +409,6 @@ public void updateShardState(final ShardRouting newRouting,
logger.debug("failed to refresh due to move to cluster wide started", e);
}

if (newRouting.primary()) {
final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode();
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint());
}
}

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
Expand Down Expand Up @@ -480,15 +470,18 @@ public void updateShardState(final ShardRouting newRouting,
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed. Rolling the translog generation is
* not strictly needed here (as we will never have collisions between sequence numbers in a translog
* generation in a new primary as it takes the last known sequence number as a starting point), but it
* simplifies reasoning about the relationship between primary terms and translog generations.
* replaying the translog and marking any operations there are completed.
*/
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
* translog generations.
*/
getEngine().rollTranslogGeneration();
getEngine().restoreLocalCheckpointFromTranslog();
getEngine().fillSeqNoGaps(newPrimaryTerm);
getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
getEngine().seqNoService().getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
Expand Down Expand Up @@ -1332,6 +1325,17 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]";
return true;
}

private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,26 @@ public RecoveryResponse recoverToTarget() throws IOException {
final Translog translog = shard.getTranslog();

final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();

if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// we set this to unassigned to create a translog roughly according to the retention policy
// on the target
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;

// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0;
// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);
} catch (final Exception e) {
Expand All @@ -178,6 +181,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

Expand All @@ -187,10 +193,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);

logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, snapshot);
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
Expand Down Expand Up @@ -224,26 +239,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {

/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
* all ops above the source local checkpoint, so we can stop check there.
*
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo);
final long localCheckpoint = shard.getLocalCheckpoint();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= endingSeqNo) {
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);

if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
Expand All @@ -253,7 +261,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
}
}
}
return tracker.getCheckpoint() >= endingSeqNo;
return tracker.getCheckpoint() >= localCheckpoint;
} else {
return false;
}
Expand Down Expand Up @@ -433,24 +441,27 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
* shard.
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param snapshot a snapshot of the translog
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();

final StopWatch stopWatch = new StopWatch().start();

logger.trace("recovery [phase2]: sending transaction log operations");
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);

stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
Expand Down Expand Up @@ -511,18 +522,26 @@ static class SendSnapshotResult {
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the local checkpoint on the target and the total number of operations sent
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);

final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
Expand All @@ -539,19 +558,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
/*
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
* any ops before the starting sequence number.
*/

final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps++;
continue;
}
operations.add(operation);
ops++;
size += operation.estimateSize();
totalSentOps++;
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
Expand All @@ -569,8 +586,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
}

assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);

if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}

logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);

Expand Down
Loading

0 comments on commit f046603

Please sign in to comment.