From 84522d2a67d75cb9484a959558771d1b209cb703 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 May 2020 14:57:46 +0200 Subject: [PATCH] Allow Bulk Snapshot Deletes to Abort (#56009) Making use of #55773 to simplify snapshot state machine. 1. Deletes with no in-progress snapshot now add the delete entry to the cluster state right away instead of doing a second CS update after the fist update was a NOOP. 2. If a bulk delete matches in-progress as well as completed snapshots, abort the in-progress snapshot and then move on to delete from the repository. --- .../snapshots/SnapshotsService.java | 104 +++++++++--------- .../snapshots/SnapshotResiliencyTests.java | 47 ++++++++ 2 files changed, 99 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 648932a322fd4..8a2d9c9a9ca6c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -984,11 +984,7 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { } /** - * Deletes snapshots from the repository or aborts a running snapshot. - * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from - * the repository. - * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the - * given names in the repository and deletes them. + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. * * @param request delete snapshot request * @param listener listener @@ -1000,39 +996,46 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]", Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName)); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) { + final Repository repository = repositoriesService.repository(repositoryName); + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) { + + private Snapshot runningSnapshot; - Snapshot runningSnapshot; + private ClusterStateUpdateTask deleteFromRepoTask; - boolean abortedDuringInit = false; + private boolean abortedDuringInit = false; + + private List outstandingDeletes; @Override - public ClusterState execute(ClusterState currentState) { + public ClusterState execute(ClusterState currentState) throws Exception { if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) { throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ " + MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion() + "]"); } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - final SnapshotsInProgress.Entry snapshotEntry; - if (snapshotNames.length == 1) { - final String snapshotName = snapshotNames[0]; - if (Regex.isSimpleMatchPattern(snapshotName)) { - snapshotEntry = null; - } else { - snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName); - } - } else { - snapshotEntry = null; - } + final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotNames, repositoryName); + final List snapshotIds = matchingSnapshotIds( + snapshotEntry == null ? null : snapshotEntry.snapshot().getSnapshotId(), + repositoryData, snapshotNames, repositoryName); if (snapshotEntry == null) { - return currentState; + deleteFromRepoTask = + createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData.getGenId(), Priority.NORMAL, listener); + return deleteFromRepoTask.execute(currentState); } + runningSnapshot = snapshotEntry.snapshot(); final ImmutableOpenMap shards; final State state = snapshotEntry.state(); final String failure; + + outstandingDeletes = new ArrayList<>(snapshotIds); + if (state != State.INIT) { + // INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo + outstandingDeletes.add(runningSnapshot.getSnapshotId()); + } if (state == State.INIT) { // snapshot is still initializing, mark it as aborted shards = snapshotEntry.shards(); @@ -1091,15 +1094,9 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (runningSnapshot == null) { - try { - repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> - createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, - repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener), - "delete completed snapshots", listener::onFailure); - } catch (RepositoryMissingException e) { - listener.onFailure(e); - } + if (deleteFromRepoTask != null) { + assert outstandingDeletes == null : "Shouldn't have outstanding deletes after already starting delete task"; + deleteFromRepoTask.clusterStateProcessed(source, oldState, newState); return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); @@ -1107,13 +1104,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS result -> { logger.debug("deleted snapshot completed - deleting files"); clusterService.submitStateUpdateTask("delete snapshot", - createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName, - result.v1().getGenId(), null, Priority.IMMEDIATE, listener)); + createDeleteStateUpdate(outstandingDeletes, repositoryName, + result.v1().getGenId(), Priority.IMMEDIATE, listener)); }, e -> { if (abortedDuringInit) { logger.info("Successfully aborted snapshot [{}]", runningSnapshot); - listener.onResponse(null); + if (outstandingDeletes.isEmpty()) { + listener.onResponse(null); + } else { + clusterService.submitStateUpdateTask("delete snapshot", + createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(), + Priority.IMMEDIATE, listener)); + } } else { if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) { @@ -1134,28 +1137,30 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public TimeValue timeout() { return request.masterNodeTimeout(); } - }); + }, "delete snapshot", listener::onFailure); } - private static List matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns, - String repositoryName) { + private static List matchingSnapshotIds(@Nullable SnapshotId inProgress, RepositoryData repositoryData, + String[] snapshotsOrPatterns, String repositoryName) { final Map allSnapshotIds = repositoryData.getSnapshotIds().stream().collect( Collectors.toMap(SnapshotId::getName, Function.identity())); final Set foundSnapshots = new HashSet<>(); for (String snapshotOrPattern : snapshotsOrPatterns) { - if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { - final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern); - if (foundId == null) { - throw new SnapshotMissingException(repositoryName, snapshotOrPattern); - } else { - foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern)); - } - } else { + if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { for (Map.Entry entry : allSnapshotIds.entrySet()) { if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { foundSnapshots.add(entry.getValue()); } } + } else { + final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern); + if (foundId == null) { + if (inProgress == null || inProgress.getName().equals(snapshotOrPattern) == false) { + throw new SnapshotMissingException(repositoryName, snapshotOrPattern); + } + } else { + foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern)); + } } } return List.copyOf(foundSnapshots); @@ -1163,7 +1168,7 @@ private static List matchingSnapshotIds(RepositoryData repositoryDat // Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found @Nullable - private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName, + private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String[] snapshotNames, String repositoryName) { if (snapshots == null) { return null; @@ -1171,7 +1176,7 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh SnapshotsInProgress.Entry snapshotEntry = null; for (SnapshotsInProgress.Entry entry : snapshots.entries()) { if (entry.repository().equals(repositoryName) - && entry.snapshot().getSnapshotId().getName().equals(snapshotName)) { + && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) { snapshotEntry = entry; break; } @@ -1180,7 +1185,7 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh } private ClusterStateUpdateTask createDeleteStateUpdate(List snapshotIds, String repoName, long repositoryStateId, - @Nullable TimeValue timeout, Priority priority, ActionListener listener) { + Priority priority, ActionListener listener) { // Short circuit to noop state update if there isn't anything to delete if (snapshotIds.isEmpty()) { return new ClusterStateUpdateTask() { @@ -1198,11 +1203,6 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(null); } - - @Override - public TimeValue timeout() { - return timeout; - } }; } return new ClusterStateUpdateTask(priority) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index dde2ce8f3ef2f..532a0b72e56c3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -576,6 +576,53 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } + public void testBulkSnapshotDeleteWithAbort() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + + final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2") + .execute(createOtherSnapshotResponseStepListener)); + + final StepListener deleteSnapshotStepListener = new StepListener<>(); + + continueOrDie(createOtherSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().cluster().deleteSnapshot( + new DeleteSnapshotRequest(repoName, "*"), deleteSnapshotStepListener)); + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + // No snapshots should be left in the repository + assertThat(snapshotIds, empty()); + + for (SnapshotId snapshotId : snapshotIds) { + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + } + public void testConcurrentSnapshotRestoreAndDeleteOther() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));