Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recovery stage transition with sync_id #57754

Merged
merged 5 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
default:
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
}
}

public void testRecovery() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryCleanFilesRequest;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -57,6 +58,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -243,6 +245,54 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception {
assertNoOpRecoveries(indexName);
}

/**
* If the recovery source is on an old node (before <pre>{@link org.elasticsearch.Version#V_7_2_0}</pre>) then the recovery target
* won't have the safe commit after phase1 because the recovery source does not send the global checkpoint in the clean_files
* step. And if the recovery fails and retries, then the recovery stage might not transition properly. This test simulates
* this behavior by changing the global checkpoint in phase1 to unassigned.
*/
public void testSimulateRecoverySourceOnOldNode() throws Exception {
internalCluster().startMasterOnlyNode();
String source = internalCluster().startDataOnlyNode();
String indexName = "test";
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)));
ensureGreen(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
.mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList()));
}
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
}
if (randomBoolean()) {
syncFlush(indexName);
}
internalCluster().startDataOnlyNode();
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, source);
Semaphore failRecovery = new Semaphore(1);
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.CLEAN_FILES)) {
RecoveryCleanFilesRequest cleanFilesRequest = (RecoveryCleanFilesRequest) request;
request = new RecoveryCleanFilesRequest(cleanFilesRequest.recoveryId(),
cleanFilesRequest.requestSeqNo(), cleanFilesRequest.shardId(), cleanFilesRequest.sourceMetaSnapshot(),
cleanFilesRequest.totalTranslogOps(), SequenceNumbers.UNASSIGNED_SEQ_NO);
}
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
if (failRecovery.tryAcquire()) {
throw new IllegalStateException("simulated");
}
}
connection.sendRequest(requestId, action, request, options);
});
assertAcked(client().admin().indices().prepareUpdateSettings()
.setIndices(indexName).setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()));
ensureGreen(indexName);
transportService.clearAllRules();
}

private void assertNoOpRecoveries(String indexName) {
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
Expand All @@ -1395,14 +1395,14 @@ public long recoverLocallyUpToGlobalCheckpoint() {
logger.debug("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}
if (safeCommit.isPresent() == false) {
logger.trace("skip local recovery as no safe commit found");
return UNASSIGNED_SEQ_NO;
}
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
try {
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
if (safeCommit.isPresent() == false) {
logger.trace("skip local recovery as no safe commit found");
return UNASSIGNED_SEQ_NO;
}
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
if (safeCommit.get().localCheckpoint == globalCheckpoint) {
logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
safeCommit.get(), globalCheckpoint);
Expand Down Expand Up @@ -1561,7 +1561,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
Expand All @@ -1582,7 +1582,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
Expand Down Expand Up @@ -1617,7 +1617,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assert assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

private boolean assertSequenceNumbersInCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {
private final int totalTranslogOps;
private final long globalCheckpoint;

RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
public RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
super(requestSeqNo);
this.recoveryId = recoveryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,20 @@ public synchronized Stage getStage() {

private void validateAndSetStage(Stage expected, Stage next) {
if (stage != expected) {
assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
throw new IllegalStateException("can't move recovery to stage [" + next + "]. current stage: ["
+ stage + "] (expected [" + expected + "])");
}
stage = next;
}

public synchronized void validateCurrentStage(Stage expected) {
if (stage != expected) {
assert false : "expected stage [" + expected + "]; but current stage is [" + stage + "]";
throw new IllegalStateException("expected stage [" + expected + "] but current stage is [" + stage + "]");
}
}

// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized RecoveryState setStage(Stage stage) {
switch (stage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception {

// copy with truncated translog
shard = newStartedShard(false);
globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
SeqNoStats seqNoStats = populateRandomData(shard);
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
globalCheckpoint = randomFrom(UNASSIGNED_SEQ_NO, seqNoStats.getMaxSeqNo());
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
replica.shardId(), replica.getPendingPrimaryTerm());
replica.store().associateIndexWithNewTranslog(translogUUID);
Expand All @@ -232,6 +233,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -55,6 +54,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;

public class RecoveryTargetTests extends ESTestCase {
abstract class Streamer<T extends Writeable> extends Thread {
Expand Down Expand Up @@ -336,31 +336,25 @@ Index createObj(StreamInput in) throws IOException {
public void testStageSequenceEnforcement() {
final DiscoveryNode discoveryNode = new DiscoveryNode("1", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT);
Stage[] stages = Stage.values();
int i = randomIntBetween(0, stages.length - 1);
int j;
do {
j = randomIntBetween(0, stages.length - 1);
} while (j == i);
Stage t = stages[i];
stages[i] = stages[j];
stages[j] = t;
try {
final AssertionError error = expectThrows(AssertionError.class, () -> {
Stage[] stages = Stage.values();
int i = randomIntBetween(0, stages.length - 1);
int j = randomValueOtherThan(i, () -> randomIntBetween(0, stages.length - 1));
Stage t = stages[i];
stages[i] = stages[j];
stages[j] = t;
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),
randomBoolean(), ShardRoutingState.INITIALIZING);
RecoveryState state = new RecoveryState(shardRouting, discoveryNode,
shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null);
for (Stage stage : stages) {
state.setStage(stage);
}
fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]");
} catch (IllegalStateException e) {
// cool
}

});
assertThat(error.getMessage(), startsWith("can't move recovery to stage"));
// but reset should be always possible.
stages = Stage.values();
i = randomIntBetween(1, stages.length - 1);
Stage[] stages = Stage.values();
int i = randomIntBetween(1, stages.length - 1);
ArrayList<Stage> list = new ArrayList<>(Arrays.asList(Arrays.copyOfRange(stages, 0, i)));
list.addAll(Arrays.asList(stages));
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),
Expand Down