From da22b1e568c015f9f2b3980c2278669c0adf99f9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Aug 2021 12:27:25 +0200 Subject: [PATCH] Fix Concurrent Snapshot Repository Corruption from Operations Queued after Failing Operations (#75733) (#76548) The node executing a shard level operation would in many cases communicate `null` for the shard state update, leading to follow-up operations incorrectly assuming an empty shard snapshot directory and starting from scratch. closes #75598 --- .../snapshots/ConcurrentSnapshotsIT.java | 46 +++++++++++++++++++ .../snapshots/SnapshotShardsService.java | 20 ++++++-- .../snapshots/SnapshotsService.java | 7 ++- .../AbstractSnapshotIntegTestCase.java | 4 ++ .../snapshots/mockstore/MockRepository.java | 17 +++++-- 5 files changed, 85 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 9be9e774941e9..114b1894af3ef 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -1611,6 +1611,52 @@ public void testOutOfOrderCloneFinalization() throws Exception { ); } + public void testQueuedAfterFailedShardSnapshot() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + + final String repository = "test-repo"; + createRepository(repository, "mock"); + + final String indexName = "test-idx"; + createIndexWithContent(indexName); + final String fullSnapshot = "full-snapshot"; + createFullSnapshot(repository, fullSnapshot); + + indexDoc(indexName, "some_id", "foo", "bar"); + blockAndFailDataNode(repository, dataNode); + final ActionFuture snapshotFutureFailure = startFullSnapshot(repository, "failing-snapshot"); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(dataNode, repository); + final ActionFuture snapshotFutureSuccess = startFullSnapshot(repository, "successful-snapshot"); + awaitNumberOfSnapshotsInProgress(2); + unblockNode(repository, dataNode); + + assertSuccessful(snapshotFutureSuccess); + final SnapshotInfo failedSnapshot = snapshotFutureFailure.get().getSnapshotInfo(); + assertEquals(SnapshotState.PARTIAL, failedSnapshot.state()); + + final SnapshotsStatusResponse snapshotsStatusResponse1 = clusterAdmin().prepareSnapshotStatus(repository) + .setSnapshots(fullSnapshot) + .get(); + + final String tmpSnapshot = "snapshot-tmp"; + createFullSnapshot(repository, tmpSnapshot); + assertAcked(startDeleteSnapshot(repository, tmpSnapshot).get()); + + final SnapshotsStatusResponse snapshotsStatusResponse2 = clusterAdmin().prepareSnapshotStatus(repository) + .setSnapshots(fullSnapshot) + .get(); + assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse2); + + assertAcked(startDeleteSnapshot(repository, "successful-snapshot").get()); + + final SnapshotsStatusResponse snapshotsStatusResponse3 = clusterAdmin().prepareSnapshotStatus(repository) + .setSnapshots(fullSnapshot) + .get(); + assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse3); + } + private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) { final SnapshotsStatusResponse snapshotsStatusResponse = client().admin() .cluster() diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 2a9c3ae45f5c1..4016fd4490425 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -233,7 +233,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { // due to CS batching we might have missed the INIT state and straight went into ABORTED // notify master that abort has completed by moving to FAILED if (shard.value.state() == ShardState.ABORTED && localNodeId.equals(shard.value.nodeId())) { - notifyFailedSnapshotShard(snapshot, sid, shard.value.reason()); + notifyFailedSnapshotShard(snapshot, sid, shard.value.reason(), shard.value.generation()); } } else { snapshotStatus.abortIfNotCompleted("snapshot has been aborted"); @@ -294,7 +294,7 @@ public void onFailure(Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); } snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); + notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation()); } } ); @@ -452,7 +452,12 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { snapshot.snapshot(), shardId ); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure()); + notifyFailedSnapshotShard( + snapshot.snapshot(), + shardId, + indexShardSnapshotStatus.getFailure(), + localShard.getValue().generation() + ); } } } @@ -469,11 +474,16 @@ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardI } /** Notify the master node that the given shard failed to be snapshotted **/ - private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { + private void notifyFailedSnapshotShard( + final Snapshot snapshot, + final ShardId shardId, + final String failure, + final ShardGeneration generation + ) { sendSnapshotShardUpdate( snapshot, shardId, - new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null) + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, generation) ); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 730edb894d47b..291361cb0a632 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -940,7 +940,12 @@ private void runReadyClone( new ShardSnapshotUpdate( target, repoShardId, - new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null) + new ShardSnapshotStatus( + localNodeId, + ShardState.FAILED, + "failed to clone shard snapshot", + shardStatusBefore.generation() + ) ), ActionListener.runBefore( ActionListener.wrap( diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 0d519206db971..15e424a24e4c2 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -257,6 +257,10 @@ public static void blockDataNode(String repository, String nodeName) { AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, nodeName).blockOnDataFiles(); } + public static void blockAndFailDataNode(String repository, String nodeName) { + AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, nodeName).blockAndFailOnDataFiles(); + } + public static void blockAllDataNodes(String repository) { for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { ((MockRepository) repositoriesService.repository(repository)).blockOnDataFiles(); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index dd62ad3a9c3b0..16c43206615dd 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -114,6 +114,8 @@ public long getFailureCount() { private volatile boolean blockOnDataFiles; + private volatile boolean blockAndFailOnDataFiles; + private volatile boolean blockOnDeleteIndexN; /** @@ -214,6 +216,7 @@ public synchronized void unblock() { blocked = false; // Clean blocking flags, so we wouldn't try to block again blockOnDataFiles = false; + blockAndFailOnDataFiles = false; blockOnAnyFiles = false; blockAndFailOnWriteIndexFile = false; blockOnWriteIndexFile = false; @@ -229,9 +232,15 @@ public synchronized void unblock() { } public void blockOnDataFiles() { + assert blockAndFailOnDataFiles == false : "Either fail or wait after data file, not both"; blockOnDataFiles = true; } + public void blockAndFailOnDataFiles() { + assert blockOnDataFiles == false : "Either fail or wait after data file, not both"; + blockAndFailOnDataFiles = true; + } + public void setBlockOnAnyFiles() { blockOnAnyFiles = true; } @@ -300,9 +309,9 @@ private synchronized boolean blockExecution() { logger.debug("[{}] Blocking execution", metadata.name()); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile || - blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta || - blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) { + while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile + || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta + || blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) { blocked = true; this.wait(); wasBlocked = true; @@ -382,6 +391,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException { } } else if (blockOnDataFiles) { blockExecutionAndMaybeWait(blobName); + } else if (blockAndFailOnDataFiles) { + blockExecutionAndFail(blobName); } } else { if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {