Skip to content

Commit

Permalink
Allow Bulk Snapshot Deletes to Abort (#56009) (#56111)
Browse files Browse the repository at this point in the history
Making use of #55773 to simplify snapshot state machine.
1. Deletes with no in-progress snapshot now add the delete entry to the cluster state right away
instead of doing a second CS update after the fist update was a NOOP.
2. If a bulk delete matches in-progress as well as completed snapshots, abort the in-progress snapshot
and then move on to delete from the repository.
  • Loading branch information
original-brownbear authored May 4, 2020
1 parent 76fa5a2 commit e8ef44c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 52 deletions.
104 changes: 52 additions & 52 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,11 +997,7 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
}

/**
* Deletes snapshots from the repository or aborts a running snapshot.
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
* the repository.
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
* given names in the repository and deletes them.
* Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them.
*
* @param request delete snapshot request
* @param listener listener
Expand All @@ -1013,39 +1009,46 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));

clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
final Repository repository = repositoriesService.repository(repositoryName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) {

private Snapshot runningSnapshot;

Snapshot runningSnapshot;
private ClusterStateUpdateTask deleteFromRepoTask;

boolean abortedDuringInit = false;
private boolean abortedDuringInit = false;

private List<SnapshotId> outstandingDeletes;

@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState execute(ClusterState currentState) throws Exception {
if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
+ "]");
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry snapshotEntry;
if (snapshotNames.length == 1) {
final String snapshotName = snapshotNames[0];
if (Regex.isSimpleMatchPattern(snapshotName)) {
snapshotEntry = null;
} else {
snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
}
} else {
snapshotEntry = null;
}
final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotNames, repositoryName);
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
snapshotEntry == null ? null : snapshotEntry.snapshot().getSnapshotId(),
repositoryData, snapshotNames, repositoryName);
if (snapshotEntry == null) {
return currentState;
deleteFromRepoTask =
createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData.getGenId(), Priority.NORMAL, listener);
return deleteFromRepoTask.execute(currentState);
}

runningSnapshot = snapshotEntry.snapshot();
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;

final State state = snapshotEntry.state();
final String failure;

outstandingDeletes = new ArrayList<>(snapshotIds);
if (state != State.INIT) {
// INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo
outstandingDeletes.add(runningSnapshot.getSnapshotId());
}
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();
Expand Down Expand Up @@ -1104,29 +1107,29 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (runningSnapshot == null) {
try {
repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
"delete completed snapshots", listener::onFailure);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
}
if (deleteFromRepoTask != null) {
assert outstandingDeletes == null : "Shouldn't have outstanding deletes after already starting delete task";
deleteFromRepoTask.clusterStateProcessed(source, oldState, newState);
return;
}
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(runningSnapshot, ActionListener.wrap(
result -> {
logger.debug("deleted snapshot completed - deleting files");
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
createDeleteStateUpdate(outstandingDeletes, repositoryName,
result.v1().getGenId(), Priority.IMMEDIATE, listener));
},
e -> {
if (abortedDuringInit) {
logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
listener.onResponse(null);
if (outstandingDeletes.isEmpty()) {
listener.onResponse(null);
} else {
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(),
Priority.IMMEDIATE, listener));
}
} else {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
!= null) {
Expand All @@ -1147,44 +1150,46 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}, "delete snapshot", listener::onFailure);
}

private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns,
String repositoryName) {
private static List<SnapshotId> matchingSnapshotIds(@Nullable SnapshotId inProgress, RepositoryData repositoryData,
String[] snapshotsOrPatterns, String repositoryName) {
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
Collectors.toMap(SnapshotId::getName, Function.identity()));
final Set<SnapshotId> foundSnapshots = new HashSet<>();
for (String snapshotOrPattern : snapshotsOrPatterns) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
if (foundId == null) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
} else {
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
}
} else {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
foundSnapshots.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
if (foundId == null) {
if (inProgress == null || inProgress.getName().equals(snapshotOrPattern) == false) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
@Nullable
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName,
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String[] snapshotNames,
String repositoryName) {
if (snapshots == null) {
return null;
}
SnapshotsInProgress.Entry snapshotEntry = null;
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.repository().equals(repositoryName)
&& entry.snapshot().getSnapshotId().getName().equals(snapshotName)) {
&& Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
snapshotEntry = entry;
break;
}
Expand All @@ -1193,7 +1198,7 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh
}

private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
@Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
Priority priority, ActionListener<Void> listener) {
// Short circuit to noop state update if there isn't anything to delete
if (snapshotIds.isEmpty()) {
return new ClusterStateUpdateTask() {
Expand All @@ -1211,11 +1216,6 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}

@Override
public TimeValue timeout() {
return timeout;
}
};
}
return new ClusterStateUpdateTask(priority) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,53 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testBulkSnapshotDeleteWithAbort() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));

final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener));

final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();

continueOrDie(createOtherSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, "*"), deleteSnapshotStepListener));

deterministicTaskQueue.runAllRunnableTasks();

SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
// No snapshots should be left in the repository
assertThat(snapshotIds, empty());

for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}
}

public void testConcurrentSnapshotRestoreAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down

0 comments on commit e8ef44c

Please sign in to comment.