Skip to content

Commit

Permalink
Fix Snapshot State Machine Issues around Failed Clones (#76419) (#76604)
Browse files Browse the repository at this point in the history
With recent fixes it is never correct to simply remove a snapshot from the cluster state without
updating other snapshot entries if an entry contains any successful shards due to possible dependencies.
This change reproduces two issues resulting from simply removing snapshot without regard for other queued
operations and fixes them by having all removal of snapshot from the cluster state go through the same
code path.
Also, this change moves the tracking of a snapshot as "ending" up a few lines to fix an assertion about finishing
snapshots that forces them to be in this collection.
  • Loading branch information
original-brownbear authored Aug 17, 2021
1 parent b509b9f commit 9bb770e
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -32,12 +30,11 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -276,15 +273,18 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();

@Override
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
finalizeSnapshotContext = new FinalizeSnapshotContext(
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
finalizeSnapshotContext.clusterMetadata(),
finalizeSnapshotContext.snapshotInfo(),
finalizeSnapshotContext.repositoryMetaVersion(),
delayedListener(finalizeSnapshotContext)
);
}
super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion,
stateTransformer, listener);
super.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
assertAcked(clone2.get());
}

public void testRemoveFailedCloneFromCSWithoutIO() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception {
// single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build()
);
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
final String testIndex2 = "index-test-2";
createIndexWithContent(testIndex);
createIndexWithContent(testIndex2);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);

createIndexWithContent("test-index-3");
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1");
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2);
awaitNumberOfSnapshotsInProgress(2);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2");
expectThrows(SnapshotException.class, cloneFuture::actionGet);
unblockNode(repoName, dataNode);
awaitNoMoreRunningOperations();
assertSuccessful(fullSnapshotFuture1);
assertSuccessful(fullSnapshotFuture2);
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(
String repoName,
String sourceSnapshot,
Expand Down Expand Up @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
}

private void blockAndFailMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta();
}

/**
* Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -19,17 +15,15 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.SnapshotShardContext;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -89,24 +83,8 @@ public Map<String, Repository.Factory> getRepositories(
private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE);

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
listener
);
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
super.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -75,24 +74,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
listener
);
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
in.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;

/**
* Context for finalizing a snapshot.
*/
public final class FinalizeSnapshotContext extends ActionListener.Delegating<
Tuple<RepositoryData, SnapshotInfo>,
Tuple<RepositoryData, SnapshotInfo>> {

private final ShardGenerations updatedShardGenerations;

private final long repositoryStateId;

private final Metadata clusterMetadata;

private final SnapshotInfo snapshotInfo;

private final Version repositoryMetaVersion;

/**
* @param updatedShardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing
* the snapshot
*/
public FinalizeSnapshotContext(
ShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener
) {
super(listener);
this.updatedShardGenerations = updatedShardGenerations;
this.repositoryStateId = repositoryStateId;
this.clusterMetadata = clusterMetadata;
this.snapshotInfo = snapshotInfo;
this.repositoryMetaVersion = repositoryMetaVersion;
}

public long repositoryStateId() {
return repositoryStateId;
}

public ShardGenerations updatedShardGenerations() {
return updatedShardGenerations;
}

public SnapshotInfo snapshotInfo() {
return snapshotInfo;
}

public Version repositoryMetaVersion() {
return repositoryMetaVersion;
}

public Metadata clusterMetadata() {
return clusterMetadata;
}

public ClusterState updatedClusterState(ClusterState state) {
return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
}

@Override
public void onResponse(Tuple<RepositoryData, SnapshotInfo> repositoryData) {
delegate.onResponse(repositoryData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,9 @@ public void onFailure(Exception e) {
* <p>
* This method is called on master after all shards are snapshotted.
*
* @param shardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
* @param finalizeSnapshotContext finalization context
*/
void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
);
void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext);

/**
* Deletes snapshots
Expand Down
Loading

0 comments on commit 9bb770e

Please sign in to comment.