diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index f6cc824b70ec4..798bbc8336d1b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception { assertAcked(clone2.get()); } + public void testRemoveFailedCloneFromCSWithoutIO() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + awaitNoMoreRunningOperations(); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + + public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception { + // single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard + final String masterNode = internalCluster().startMasterOnlyNode( + Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build() + ); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + final String testIndex2 = "index-test-2"; + createIndexWithContent(testIndex); + createIndexWithContent(testIndex2); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + + createIndexWithContent("test-index-3"); + blockDataNode(repoName, dataNode); + final ActionFuture fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1"); + waitForBlock(dataNode, repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2); + awaitNumberOfSnapshotsInProgress(2); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + final ActionFuture fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2"); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + unblockNode(repoName, dataNode); + awaitNoMoreRunningOperations(); + assertSuccessful(fullSnapshotFuture1); + assertSuccessful(fullSnapshotFuture2); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + private ActionFuture startCloneFromDataNode( String repoName, String sourceSnapshot, @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) { AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta(); } + private void blockAndFailMasterOnShardClone(String repoName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta(); + } + /** * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index 7e1ad3f3f83c1..646a44b77a425 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -79,7 +79,7 @@ public Metadata clusterMetadata() { } public ClusterState updatedClusterState(ClusterState state) { - return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot()); + return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot()); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index cfcd567145040..922e6d2dbd167 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1857,7 +1857,9 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu } if (entry.isClone() && entry.state() == State.FAILED) { logger.debug("Removing failed snapshot clone [{}] from cluster state", entry); - removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null); + if (newFinalization) { + removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null, null); + } return; } final String repoName = snapshot.getRepository(); @@ -2219,15 +2221,15 @@ private static Tuple> read } /** - * Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the - * given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them - * will not cause rolling back to an outdated shard generation. + * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update + * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an + * outdated shard generation. * * @param state current cluster state * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) { + public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance // BlobStoreTestUtil to catch this leak SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); @@ -2382,32 +2384,6 @@ private static ImmutableOpenMap.Builder maybeAddUpda return updatedShardAssignments; } - /** - * Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at - * any point before being finalized in the repository. - * - * @param state current cluster state - * @param snapshot snapshot for which to remove the snapshot operation - * @return updated cluster state - */ - private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) { - SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - ClusterState result = state; - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot)) { - changed = true; - } else { - entries.add(entry); - } - } - if (changed) { - result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); - } - return readyDeletions(result).v1(); - } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -2430,7 +2406,7 @@ private void removeFailedSnapshotFromClusterState( @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot); + final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index ac6b92e69ee05..4f155297186cd 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -135,6 +135,8 @@ public long getFailureCount() { private volatile boolean blockOnWriteShardLevelMeta; + private volatile boolean blockAndFailOnWriteShardLevelMeta; + private volatile boolean blockOnReadIndexMeta; private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false); @@ -220,6 +222,7 @@ public synchronized void unblock() { blockedIndexId = null; blockOnDeleteIndexN = false; blockOnWriteShardLevelMeta = false; + blockAndFailOnWriteShardLevelMeta = false; blockOnReadIndexMeta = false; blockOnceOnReadSnapshotInfo.set(false); this.notifyAll(); @@ -262,9 +265,15 @@ public void setBlockOnDeleteIndexFile() { } public void setBlockOnWriteShardLevelMeta() { + assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; blockOnWriteShardLevelMeta = true; } + public void setBlockAndFailOnWriteShardLevelMeta() { + assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; + blockAndFailOnWriteShardLevelMeta = true; + } + public void setBlockOnReadIndexMeta() { blockOnReadIndexMeta = true; } @@ -295,9 +304,9 @@ private synchronized boolean blockExecution() { logger.debug("[{}] Blocking execution", metadata.name()); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile || - blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta || - blockAndFailOnDataFiles || blockedIndexId != null) { + while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile + || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta + || blockOnReadIndexMeta || blockedIndexId != null) { blocked = true; this.wait(); wasBlocked = true; @@ -539,9 +548,12 @@ public void writeBlob(String blobName, private void beforeWrite(String blobName) throws IOException { maybeIOExceptionOrBlock(blobName); - if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) - && path().equals(basePath()) == false) { - blockExecutionAndMaybeWait(blobName); + if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) { + if (blockOnWriteShardLevelMeta) { + blockExecutionAndMaybeWait(blobName); + } else if (blockAndFailOnWriteShardLevelMeta) { + blockExecutionAndFail(blobName); + } } }