diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f2a7938dcad8..2b8c038c20cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1509,7 +1509,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { } try { maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.setLocalTranslogStage(); if (safeCommit.isPresent() == false) { logger.trace("skip local recovery as no safe commit found"); return UNASSIGNED_SEQ_NO; @@ -1675,7 +1675,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException { public void openEngineAndRecoverFromTranslog() throws IOException { recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); maybeCheckIndex(); - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.setLocalTranslogStage(); final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 09797dc7c8b1..c60aaafc226b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -225,6 +225,14 @@ public synchronized RecoveryState setStage(Stage stage) { return this; } + public synchronized RecoveryState setLocalTranslogStage() { + return setStage(Stage.TRANSLOG); + } + + public synchronized RecoveryState setRemoteTranslogStage() { + return setStage(Stage.TRANSLOG); + } + public Index getIndex() { return index; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index eebfe7e80779..1925c41accc4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -455,7 +455,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada assert indexShard.assertRetentionLeasesPersisted(); } indexShard.maybeCheckIndex(); - state().setStage(RecoveryState.Stage.TRANSLOG); + state().setRemoteTranslogStage(); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRelocationIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRelocationIntegTests.java index 7a696855a7ff..d5f7eee79373 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRelocationIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRelocationIntegTests.java @@ -90,7 +90,7 @@ public void testRelocationWaitsForPreWarm() throws Exception { assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName()); }); - assertBusy(() -> assertSame(RecoveryState.Stage.TRANSLOG, getActiveRelocations(restoredIndex).get(0).getStage())); + assertBusy(() -> assertSame(RecoveryState.Stage.FINALIZE, getActiveRelocations(restoredIndex).get(0).getStage())); final Index restoredIdx = clusterAdmin().prepareState().get().getState().metadata().index(restoredIndex).getIndex(); final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, secondDataNode); assertEquals(1, indicesService.indexService(restoredIdx).getShard(0).outstandingCleanFilesConditions()); @@ -126,7 +126,7 @@ private static List getActiveRelocations(String restoredIndex) { .stream() // filter for relocations that are not in stage FINALIZE (they could end up in this stage without progress for good if the // target node does not have enough cache space available to hold the primary completely - .filter(recoveryState -> recoveryState.getSourceNode() != null && recoveryState.getStage() != RecoveryState.Stage.FINALIZE) + .filter(recoveryState -> recoveryState.getSourceNode() != null) .collect(Collectors.toList()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java index 8d9c73d165e0..9e236d695277 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java @@ -16,6 +16,7 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState { private boolean preWarmComplete; + private boolean remoteTranslogSet; public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { super(shardRouting, targetNode, sourceNode, new Index()); @@ -24,7 +25,7 @@ public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode @Override public synchronized RecoveryState setStage(Stage stage) { // The transition to the final state was done by #prewarmCompleted, just ignore the transition - if (getStage() == Stage.DONE) { + if (getStage() == Stage.DONE || stage == Stage.FINALIZE && remoteTranslogSet) { return this; } @@ -35,9 +36,42 @@ public synchronized RecoveryState setStage(Stage stage) { return this; } + if (stage == Stage.INIT) { + remoteTranslogSet = false; + } + return super.setStage(stage); } + @Override + public synchronized RecoveryState setRemoteTranslogStage() { + remoteTranslogSet = true; + super.setStage(Stage.TRANSLOG); + return super.setStage(Stage.FINALIZE); + } + + @Override + public synchronized void validateCurrentStage(Stage expected) { + if (remoteTranslogSet == false) { + super.validateCurrentStage(expected); + } else { + final Stage stage = getStage(); + // For small indices it's possible that pre-warming finished shortly + // after transitioning to FINALIZE stage + if (stage != Stage.FINALIZE && stage != Stage.DONE) { + assert false : "expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; but current stage is [" + stage + "]"; + throw new IllegalStateException( + "expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; " + "but current stage is [" + stage + "]" + ); + } + } + } + + // Visible for tests + boolean isRemoteTranslogSet() { + return remoteTranslogSet; + } + public synchronized void setPreWarmComplete() { // For small shards it's possible that the // cache is pre-warmed before the stage has transitioned diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/indices/recovery/SearchableSnapshotsRecoveryStateTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/indices/recovery/SearchableSnapshotsRecoveryStateTests.java index 6add5e038ff5..54da35887f1b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/indices/recovery/SearchableSnapshotsRecoveryStateTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/indices/recovery/SearchableSnapshotsRecoveryStateTests.java @@ -128,6 +128,19 @@ public void testFilesAreIgnored() { assertThat(recoveryState.getIndex().getFileDetails("non_pre_warmed_file"), is(nullValue())); } + public void testResetAfterRemoteTranslogIsSetResetsFlag() { + SearchableSnapshotRecoveryState recoveryState = createRecoveryState(); + recoveryState.getIndex().setFileDetailsComplete(); + + recoveryState.setStage(RecoveryState.Stage.INDEX).setStage(RecoveryState.Stage.VERIFY_INDEX).setRemoteTranslogStage(); + + assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.FINALIZE)); + assertThat(recoveryState.isRemoteTranslogSet(), equalTo(true)); + + recoveryState.setStage(RecoveryState.Stage.INIT); + assertThat(recoveryState.isRemoteTranslogSet(), equalTo(false)); + } + private SearchableSnapshotRecoveryState createRecoveryState() { ShardRouting shardRouting = TestShardRouting.newShardRouting( randomAlphaOfLength(10),