diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java
index a182f1b340ae8..8e86ff1c8de0f 100644
--- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java
+++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java
@@ -278,6 +278,9 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
default:
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
+ if (randomBoolean()) {
+ syncedFlush(index);
+ }
}
public void testRecovery() throws Exception {
diff --git a/server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java b/server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java
index a8a5a15c52ed2..e656a21d259d4 100644
--- a/server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java
+++ b/server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java
@@ -39,6 +39,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
+import org.elasticsearch.indices.recovery.RecoveryCleanFilesRequest;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
@@ -57,6 +58,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -243,6 +245,54 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception {
assertNoOpRecoveries(indexName);
}
+ /**
+ * If the recovery source is on an old node (before
{@link org.elasticsearch.Version#V_7_2_0}
) then the recovery target
+ * won't have the safe commit after phase1 because the recovery source does not send the global checkpoint in the clean_files
+ * step. And if the recovery fails and retries, then the recovery stage might not transition properly. This test simulates
+ * this behavior by changing the global checkpoint in phase1 to unassigned.
+ */
+ public void testSimulateRecoverySourceOnOldNode() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ String source = internalCluster().startDataOnlyNode();
+ String indexName = "test";
+ assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
+ Settings.builder()
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)));
+ ensureGreen(indexName);
+ if (randomBoolean()) {
+ indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
+ .mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList()));
+ }
+ if (randomBoolean()) {
+ client().admin().indices().prepareFlush(indexName).get();
+ }
+ if (randomBoolean()) {
+ syncFlush(indexName);
+ }
+ internalCluster().startDataOnlyNode();
+ MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, source);
+ Semaphore failRecovery = new Semaphore(1);
+ transportService.addSendBehavior((connection, requestId, action, request, options) -> {
+ if (action.equals(PeerRecoveryTargetService.Actions.CLEAN_FILES)) {
+ RecoveryCleanFilesRequest cleanFilesRequest = (RecoveryCleanFilesRequest) request;
+ request = new RecoveryCleanFilesRequest(cleanFilesRequest.recoveryId(),
+ cleanFilesRequest.requestSeqNo(), cleanFilesRequest.shardId(), cleanFilesRequest.sourceMetaSnapshot(),
+ cleanFilesRequest.totalTranslogOps(), SequenceNumbers.UNASSIGNED_SEQ_NO);
+ }
+ if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
+ if (failRecovery.tryAcquire()) {
+ throw new IllegalStateException("simulated");
+ }
+ }
+ connection.sendRequest(requestId, action, request, options);
+ });
+ assertAcked(client().admin().indices().prepareUpdateSettings()
+ .setIndices(indexName).setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()));
+ ensureGreen(indexName);
+ transportService.clearAllRules();
+ }
+
private void assertNoOpRecoveries(String indexName) {
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index fc936e8f9a5b3..39512983fba5c 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1380,7 +1380,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
- assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
+ recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
final Optional safeCommit;
final long globalCheckpoint;
@@ -1395,14 +1395,14 @@ public long recoverLocallyUpToGlobalCheckpoint() {
logger.debug("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}
- if (safeCommit.isPresent() == false) {
- logger.trace("skip local recovery as no safe commit found");
- return UNASSIGNED_SEQ_NO;
- }
- assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
try {
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
+ if (safeCommit.isPresent() == false) {
+ logger.trace("skip local recovery as no safe commit found");
+ return UNASSIGNED_SEQ_NO;
+ }
+ assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
if (safeCommit.get().localCheckpoint == globalCheckpoint) {
logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
safeCommit.get(), globalCheckpoint);
@@ -1561,7 +1561,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
- assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
+ recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
@@ -1582,7 +1582,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
- assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
+ recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
@@ -1617,7 +1617,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assert assertSequenceNumbersInCommit();
- assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
+ recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
private boolean assertSequenceNumbersInCommit() throws IOException {
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java
index 864796b383dd4..3b7690c8820aa 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java
@@ -36,7 +36,7 @@ public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {
private final int totalTranslogOps;
private final long globalCheckpoint;
- RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
+ public RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
super(requestSeqNo);
this.recoveryId = recoveryId;
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
index fec3f1508ff99..2532f87f4cab5 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java
@@ -170,12 +170,20 @@ public synchronized Stage getStage() {
private void validateAndSetStage(Stage expected, Stage next) {
if (stage != expected) {
+ assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
throw new IllegalStateException("can't move recovery to stage [" + next + "]. current stage: ["
+ stage + "] (expected [" + expected + "])");
}
stage = next;
}
+ public synchronized void validateCurrentStage(Stage expected) {
+ if (stage != expected) {
+ assert false : "expected stage [" + expected + "]; but current stage is [" + stage + "]";
+ throw new IllegalStateException("expected stage [" + expected + "] but current stage is [" + stage + "]");
+ }
+ }
+
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized RecoveryState setStage(Stage stage) {
switch (stage) {
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index eebd0b7ebe3d4..8fb96d244418a 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -216,9 +216,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
// copy with truncated translog
shard = newStartedShard(false);
- globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
+ SeqNoStats seqNoStats = populateRandomData(shard);
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
+ globalCheckpoint = randomFrom(UNASSIGNED_SEQ_NO, seqNoStats.getMaxSeqNo());
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
replica.shardId(), replica.getPendingPrimaryTerm());
replica.store().associateIndexWithNewTranslog(translogUUID);
@@ -232,6 +233,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
+ assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java
index 1c2b5331fef30..fe1250f113cb5 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java
@@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
-import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
@@ -55,6 +54,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
public class RecoveryTargetTests extends ESTestCase {
abstract class Streamer extends Thread {
@@ -336,16 +336,13 @@ Index createObj(StreamInput in) throws IOException {
public void testStageSequenceEnforcement() {
final DiscoveryNode discoveryNode = new DiscoveryNode("1", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT);
- Stage[] stages = Stage.values();
- int i = randomIntBetween(0, stages.length - 1);
- int j;
- do {
- j = randomIntBetween(0, stages.length - 1);
- } while (j == i);
- Stage t = stages[i];
- stages[i] = stages[j];
- stages[j] = t;
- try {
+ final AssertionError error = expectThrows(AssertionError.class, () -> {
+ Stage[] stages = Stage.values();
+ int i = randomIntBetween(0, stages.length - 1);
+ int j = randomValueOtherThan(i, () -> randomIntBetween(0, stages.length - 1));
+ Stage t = stages[i];
+ stages[i] = stages[j];
+ stages[j] = t;
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),
randomBoolean(), ShardRoutingState.INITIALIZING);
RecoveryState state = new RecoveryState(shardRouting, discoveryNode,
@@ -353,14 +350,11 @@ public void testStageSequenceEnforcement() {
for (Stage stage : stages) {
state.setStage(stage);
}
- fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]");
- } catch (IllegalStateException e) {
- // cool
- }
-
+ });
+ assertThat(error.getMessage(), startsWith("can't move recovery to stage"));
// but reset should be always possible.
- stages = Stage.values();
- i = randomIntBetween(1, stages.length - 1);
+ Stage[] stages = Stage.values();
+ int i = randomIntBetween(1, stages.length - 1);
ArrayList list = new ArrayList<>(Arrays.asList(Arrays.copyOfRange(stages, 0, i)));
list.addAll(Arrays.asList(stages));
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),