Skip to content

Commit

Permalink
Skip TRANSLOG stage for searchable snapshots recovery state (#70620)
Browse files Browse the repository at this point in the history
This commit introduces a change where searchable snapshots
skip the RecoveryState TRANSLOG stage. Since #65531 was introduced, the
cleanFiles peer recovery phase is blocked until the prewarming
completes (this is done to avoid search latency spikes due to a cold
cache). In that phase, the RecoveryState stage is
TRANSLOG which can be confusing as we don't replay
any ops during searchable snapshots recoveries. In order
to avoid that confusion we transition directly to
FINALIZE stage.

Backport of #70311
  • Loading branch information
fcofdez authored Mar 22, 2021
1 parent a387895 commit 81959ce
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
}
try {
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.setLocalTranslogStage();
if (safeCommit.isPresent() == false) {
logger.trace("skip local recovery as no safe commit found");
return UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -1675,7 +1675,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
public void openEngineAndRecoverFromTranslog() throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.setLocalTranslogStage();
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public synchronized RecoveryState setStage(Stage stage) {
return this;
}

public synchronized RecoveryState setLocalTranslogStage() {
return setStage(Stage.TRANSLOG);
}

public synchronized RecoveryState setRemoteTranslogStage() {
return setStage(Stage.TRANSLOG);
}

public Index getIndex() {
return index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
assert indexShard.assertRetentionLeasesPersisted();
}
indexShard.maybeCheckIndex();
state().setStage(RecoveryState.Stage.TRANSLOG);
state().setRemoteTranslogStage();
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testRelocationWaitsForPreWarm() throws Exception {
assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName());
});

assertBusy(() -> assertSame(RecoveryState.Stage.TRANSLOG, getActiveRelocations(restoredIndex).get(0).getStage()));
assertBusy(() -> assertSame(RecoveryState.Stage.FINALIZE, getActiveRelocations(restoredIndex).get(0).getStage()));
final Index restoredIdx = clusterAdmin().prepareState().get().getState().metadata().index(restoredIndex).getIndex();
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, secondDataNode);
assertEquals(1, indicesService.indexService(restoredIdx).getShard(0).outstandingCleanFilesConditions());
Expand Down Expand Up @@ -126,7 +126,7 @@ private static List<RecoveryState> getActiveRelocations(String restoredIndex) {
.stream()
// filter for relocations that are not in stage FINALIZE (they could end up in this stage without progress for good if the
// target node does not have enough cache space available to hold the primary completely
.filter(recoveryState -> recoveryState.getSourceNode() != null && recoveryState.getStage() != RecoveryState.Stage.FINALIZE)
.filter(recoveryState -> recoveryState.getSourceNode() != null)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

public final class SearchableSnapshotRecoveryState extends RecoveryState {
private boolean preWarmComplete;
private boolean remoteTranslogSet;

public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
super(shardRouting, targetNode, sourceNode, new Index());
Expand All @@ -24,7 +25,7 @@ public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode
@Override
public synchronized RecoveryState setStage(Stage stage) {
// The transition to the final state was done by #prewarmCompleted, just ignore the transition
if (getStage() == Stage.DONE) {
if (getStage() == Stage.DONE || stage == Stage.FINALIZE && remoteTranslogSet) {
return this;
}

Expand All @@ -35,9 +36,42 @@ public synchronized RecoveryState setStage(Stage stage) {
return this;
}

if (stage == Stage.INIT) {
remoteTranslogSet = false;
}

return super.setStage(stage);
}

@Override
public synchronized RecoveryState setRemoteTranslogStage() {
remoteTranslogSet = true;
super.setStage(Stage.TRANSLOG);
return super.setStage(Stage.FINALIZE);
}

@Override
public synchronized void validateCurrentStage(Stage expected) {
if (remoteTranslogSet == false) {
super.validateCurrentStage(expected);
} else {
final Stage stage = getStage();
// For small indices it's possible that pre-warming finished shortly
// after transitioning to FINALIZE stage
if (stage != Stage.FINALIZE && stage != Stage.DONE) {
assert false : "expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; but current stage is [" + stage + "]";
throw new IllegalStateException(
"expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; " + "but current stage is [" + stage + "]"
);
}
}
}

// Visible for tests
boolean isRemoteTranslogSet() {
return remoteTranslogSet;
}

public synchronized void setPreWarmComplete() {
// For small shards it's possible that the
// cache is pre-warmed before the stage has transitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public void testFilesAreIgnored() {
assertThat(recoveryState.getIndex().getFileDetails("non_pre_warmed_file"), is(nullValue()));
}

public void testResetAfterRemoteTranslogIsSetResetsFlag() {
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
recoveryState.getIndex().setFileDetailsComplete();

recoveryState.setStage(RecoveryState.Stage.INDEX).setStage(RecoveryState.Stage.VERIFY_INDEX).setRemoteTranslogStage();

assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.FINALIZE));
assertThat(recoveryState.isRemoteTranslogSet(), equalTo(true));

recoveryState.setStage(RecoveryState.Stage.INIT);
assertThat(recoveryState.isRemoteTranslogSet(), equalTo(false));
}

private SearchableSnapshotRecoveryState createRecoveryState() {
ShardRouting shardRouting = TestShardRouting.newShardRouting(
randomAlphaOfLength(10),
Expand Down

0 comments on commit 81959ce

Please sign in to comment.