From e2ed0900b5474480a5add1e5cc1b69952ee5726b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 12 Aug 2021 15:50:17 +0200 Subject: [PATCH] Fix Snapshot State Machine Issues around Failed Clones With recent fixes it is never correct to simply remove a snapshot from the cluster state without updating other snapshot entries if an entry contains any successful shards due to possible dependencies. This change reproduces two issues resulting from simply removing snapshot without regard for other queued operations and fixes them by having all removal of snapshot from the cluster state go through the same code path. Also, this change moves the tracking of a snapshot as "ending" up a few lines to fix an assertion about finishing snapshots that forces them to be in this collection. --- .../snapshots/CloneSnapshotIT.java | 65 +++++++++++++++++++ .../repositories/FinalizeSnapshotContext.java | 2 +- .../snapshots/SnapshotsService.java | 42 +++--------- .../snapshots/mockstore/MockRepository.java | 22 +++++-- 4 files changed, 92 insertions(+), 39 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index 71a075a3cbb36..3ccb6d20d66f6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception { assertAcked(clone2.get()); } + public void testRemoveFailedCloneFromCSWithoutIO() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + awaitNoMoreRunningOperations(); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + + public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception { + // single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard + final String masterNode = internalCluster().startMasterOnlyNode( + Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build() + ); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + final String testIndex2 = "index-test-2"; + createIndexWithContent(testIndex); + createIndexWithContent(testIndex2); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + + createIndexWithContent("test-index-3"); + blockDataNode(repoName, dataNode); + final ActionFuture fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1"); + waitForBlock(dataNode, repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2); + awaitNumberOfSnapshotsInProgress(2); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + final ActionFuture fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2"); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + unblockNode(repoName, dataNode); + awaitNoMoreRunningOperations(); + assertSuccessful(fullSnapshotFuture1); + assertSuccessful(fullSnapshotFuture2); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + private ActionFuture startCloneFromDataNode( String repoName, String sourceSnapshot, @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) { AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta(); } + private void blockAndFailMasterOnShardClone(String repoName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta(); + } + /** * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index 7e1ad3f3f83c1..646a44b77a425 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -79,7 +79,7 @@ public Metadata clusterMetadata() { } public ClusterState updatedClusterState(ClusterState state) { - return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot()); + return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot()); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 70426a27e417c..b64f454f0258c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1376,12 +1376,14 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn */ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) { final Snapshot snapshot = entry.snapshot(); + final boolean newFinalization = endingSnapshots.add(snapshot); if (entry.isClone() && entry.state() == State.FAILED) { logger.debug("Removing failed snapshot clone [{}] from cluster state", entry); - removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null); + if (newFinalization) { + removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null); + } return; } - final boolean newFinalization = endingSnapshots.add(snapshot); final String repoName = snapshot.getRepository(); if (tryEnterRepoLoop(repoName)) { if (repositoryData == null) { @@ -1741,15 +1743,15 @@ private static Tuple> read } /** - * Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the - * given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them - * will not cause rolling back to an outdated shard generation. + * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update + * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an + * outdated shard generation. * * @param state current cluster state * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) { + public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance // BlobStoreTestUtil to catch this leak SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); @@ -1904,32 +1906,6 @@ private static ImmutableOpenMap.Builder maybeAddUpda return updatedShardAssignments; } - /** - * Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at - * any point before being finalized in the repository. - * - * @param state current cluster state - * @param snapshot snapshot for which to remove the snapshot operation - * @return updated cluster state - */ - private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) { - SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - ClusterState result = state; - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot)) { - changed = true; - } else { - entries.add(entry); - } - } - if (changed) { - result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); - } - return readyDeletions(result).v1(); - } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -1946,7 +1922,7 @@ private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception f @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot); + final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 7feca41e6e462..dfa54b3ca5c75 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -135,6 +135,8 @@ public long getFailureCount() { private volatile boolean blockOnWriteShardLevelMeta; + private volatile boolean blockAndFailOnWriteShardLevelMeta; + private volatile boolean blockOnReadIndexMeta; private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false); @@ -224,6 +226,7 @@ public synchronized void unblock() { blockedIndexId = null; blockOnDeleteIndexN = false; blockOnWriteShardLevelMeta = false; + blockAndFailOnWriteShardLevelMeta = false; blockOnReadIndexMeta = false; blockOnceOnReadSnapshotInfo.set(false); blockAndFailOnReadSnapFile = false; @@ -268,9 +271,15 @@ public void setBlockOnDeleteIndexFile() { } public void setBlockOnWriteShardLevelMeta() { + assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; blockOnWriteShardLevelMeta = true; } + public void setBlockAndFailOnWriteShardLevelMeta() { + assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; + blockAndFailOnWriteShardLevelMeta = true; + } + public void setBlockOnReadIndexMeta() { blockOnReadIndexMeta = true; } @@ -310,8 +319,8 @@ private synchronized boolean blockExecution() { boolean wasBlocked = false; try { while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile - || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta - || blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) { + || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta + || blockOnReadIndexMeta || blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) { blocked = true; this.wait(); wasBlocked = true; @@ -555,9 +564,12 @@ public void writeBlob(String blobName, private void beforeWrite(String blobName) throws IOException { maybeIOExceptionOrBlock(blobName); - if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) - && path().equals(basePath()) == false) { - blockExecutionAndMaybeWait(blobName); + if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) { + if (blockOnWriteShardLevelMeta) { + blockExecutionAndMaybeWait(blobName); + } else if (blockAndFailOnWriteShardLevelMeta) { + blockExecutionAndFail(blobName); + } } }