Skip to content

Commit

Permalink
Fix Snapshot State Machine Issues around Failed Clones (elastic#76419)
Browse files Browse the repository at this point in the history
With recent fixes it is never correct to simply remove a snapshot from the cluster state without
updating other snapshot entries if an entry contains any successful shards due to possible dependencies.
This change reproduces two issues resulting from simply removing snapshot without regard for other queued
operations and fixes them by having all removal of snapshot from the cluster state go through the same
code path.
Also, this change moves the tracking of a snapshot as "ending" up a few lines to fix an assertion about finishing
snapshots that forces them to be in this collection.
  • Loading branch information
original-brownbear committed Aug 17, 2021
1 parent eadfeac commit 20b959a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
assertAcked(clone2.get());
}

public void testRemoveFailedCloneFromCSWithoutIO() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception {
// single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build()
);
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
final String testIndex2 = "index-test-2";
createIndexWithContent(testIndex);
createIndexWithContent(testIndex2);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);

createIndexWithContent("test-index-3");
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1");
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2);
awaitNumberOfSnapshotsInProgress(2);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2");
expectThrows(SnapshotException.class, cloneFuture::actionGet);
unblockNode(repoName, dataNode);
awaitNoMoreRunningOperations();
assertSuccessful(fullSnapshotFuture1);
assertSuccessful(fullSnapshotFuture2);
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(
String repoName,
String sourceSnapshot,
Expand Down Expand Up @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
}

private void blockAndFailMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta();
}

/**
* Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Metadata clusterMetadata() {
}

public ClusterState updatedClusterState(ClusterState state) {
return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot());
return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,9 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
}
if (entry.isClone() && entry.state() == State.FAILED) {
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null);
if (newFinalization) {
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null, null);
}
return;
}
final String repoName = snapshot.getRepository();
Expand Down Expand Up @@ -2219,15 +2221,15 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
}

/**
* Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the
* given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them
* will not cause rolling back to an outdated shard generation.
* Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update
* the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an
* outdated shard generation.
*
* @param state current cluster state
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
// TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
// BlobStoreTestUtil to catch this leak
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
Expand Down Expand Up @@ -2382,32 +2384,6 @@ private static <T> ImmutableOpenMap.Builder<T, ShardSnapshotStatus> maybeAddUpda
return updatedShardAssignments;
}

/**
* Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at
* any point before being finalized in the repository.
*
* @param state current cluster state
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot)) {
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
}
return readyDeletions(result).v1();
}

/**
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only
* used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the
Expand All @@ -2430,7 +2406,7 @@ private void removeFailedSnapshotFromClusterState(

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public long getFailureCount() {

private volatile boolean blockOnWriteShardLevelMeta;

private volatile boolean blockAndFailOnWriteShardLevelMeta;

private volatile boolean blockOnReadIndexMeta;

private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false);
Expand Down Expand Up @@ -220,6 +222,7 @@ public synchronized void unblock() {
blockedIndexId = null;
blockOnDeleteIndexN = false;
blockOnWriteShardLevelMeta = false;
blockAndFailOnWriteShardLevelMeta = false;
blockOnReadIndexMeta = false;
blockOnceOnReadSnapshotInfo.set(false);
this.notifyAll();
Expand Down Expand Up @@ -262,9 +265,15 @@ public void setBlockOnDeleteIndexFile() {
}

public void setBlockOnWriteShardLevelMeta() {
assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
blockOnWriteShardLevelMeta = true;
}

public void setBlockAndFailOnWriteShardLevelMeta() {
assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
blockAndFailOnWriteShardLevelMeta = true;
}

public void setBlockOnReadIndexMeta() {
blockOnReadIndexMeta = true;
}
Expand Down Expand Up @@ -295,9 +304,9 @@ private synchronized boolean blockExecution() {
logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta ||
blockAndFailOnDataFiles || blockedIndexId != null) {
while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta
|| blockOnReadIndexMeta || blockedIndexId != null) {
blocked = true;
this.wait();
wasBlocked = true;
Expand Down Expand Up @@ -539,9 +548,12 @@ public void writeBlob(String blobName,

private void beforeWrite(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName);
if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
&& path().equals(basePath()) == false) {
blockExecutionAndMaybeWait(blobName);
if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) {
if (blockOnWriteShardLevelMeta) {
blockExecutionAndMaybeWait(blobName);
} else if (blockAndFailOnWriteShardLevelMeta) {
blockExecutionAndFail(blobName);
}
}
}

Expand Down

0 comments on commit 20b959a

Please sign in to comment.