Skip to content

Commit

Permalink
Remove and inline methods in SnapshotsService.deleteSnapshots() (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx authored Aug 5, 2021
1 parent 6f31abc commit ee66de9
Showing 1 changed file with 75 additions and 84 deletions.
159 changes: 75 additions & 84 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.EMPTY
);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
Expand Down Expand Up @@ -461,7 +461,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis
public ClusterState execute(ClusterState currentState) {
ensureRepositoryExists(repositoryName, currentState);
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
Expand Down Expand Up @@ -534,7 +534,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
}

private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
private static void ensureNoCleanupInProgress(
final ClusterState currentState,
final String repositoryName,
final String snapshotName,
final String reason
) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
Expand All @@ -543,7 +548,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
"cannot "
+ reason
+ " while a repository cleanup is in-progress in "
+ repositoryCleanupInProgress.entries()
.stream()
.map(RepositoryCleanupInProgress.Entry::repository)
.collect(Collectors.toSet())
);
}
}
Expand Down Expand Up @@ -2021,18 +2032,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
* @param listener listener
*/
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {

final String repositoryName = request.repository();
final String[] snapshotNames = request.snapshots();
final String repoName = request.repository();
logger.info(
() -> new ParameterizedMessage(
"deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames),
repoName
repositoryName
)
);

final Repository repository = repositoriesService.repository(repoName);
final Repository repository = repositoriesService.repository(repositoryName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {

private SnapshotDeletionsInProgress.Entry newDelete = null;
Expand All @@ -2049,61 +2059,87 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis

@Override
public ClusterState execute(ClusterState currentState) {
ensureRepositoryExists(repoName, currentState);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()),
repositoryData,
snapshotNames,
repoName
);
ensureRepositoryExists(repositoryName, currentState);
final Set<SnapshotId> snapshotIds = new HashSet<>();

// find in-progress snapshots to delete in cluster state
final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
snapshotIds.add(snapshotId);
}
}

// find snapshots to delete in repository data
final Map<String, SnapshotId> snapshotsIdsInRepository = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
for (String snapshotOrPattern : snapshotNames) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : snapshotsIdsInRepository.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
snapshotIds.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern);
if (foundId == null) {
if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
snapshotIds.add(foundId);
}
}
}

if (snapshotIds.isEmpty()) {
return currentState;
}
final Set<SnapshotId> activeCloneSources = snapshots.entries()

final Set<SnapshotId> activeCloneSources = snapshotsInProgress.entries()
.stream()
.filter(SnapshotsInProgress.Entry::isClone)
.map(SnapshotsInProgress.Entry::source)
.collect(Collectors.toSet());
for (SnapshotId snapshotId : snapshotIds) {
if (activeCloneSources.contains(snapshotId)) {
throw new ConcurrentSnapshotExecutionException(
new Snapshot(repoName, snapshotId),
new Snapshot(repositoryName, snapshotId),
"cannot delete snapshot while it is being cloned"
);
}
}

ensureNoCleanupInProgress(
currentState,
repositoryName,
snapshotIds.stream().findFirst().get().getName(),
"delete snapshot"
);

final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.EMPTY
);
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(
new Snapshot(repoName, snapshotIds.get(0)),
"cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
);
}

final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// don't allow snapshot deletions while a restore is taking place,
// otherwise we could end up deleting a snapshot that is being restored
// and the files the restore depends on would all be gone

for (RestoreInProgress.Entry entry : restoreInProgress) {
if (repoName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
if (repositoryName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
throw new ConcurrentSnapshotExecutionException(
new Snapshot(repoName, snapshotIds.get(0)),
new Snapshot(repositoryName, snapshotIds.stream().findFirst().get()),
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"
);
}
}
// Snapshot ids that will have to be physically deleted from the repository
final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> {
final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshotsInProgress.entries().stream().map(existing -> {
if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
// snapshot is started - mark every non completed shard as aborted
final SnapshotsInProgress.Entry abortedEntry = existing.abort();
Expand All @@ -2130,14 +2166,15 @@ public ClusterState execute(ClusterState currentState) {
// add the snapshot deletion to the cluster state
final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries()
.stream()
.filter(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.WAITING)
.filter(entry -> entry.repository().equals(repositoryName))
.filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.WAITING)
.findFirst()
.orElse(null);
if (replacedEntry == null) {
final Optional<SnapshotDeletionsInProgress.Entry> foundDuplicate = deletionsInProgress.getEntries()
.stream()
.filter(
entry -> entry.repository().equals(repoName)
entry -> entry.repository().equals(repositoryName)
&& entry.state() == SnapshotDeletionsInProgress.State.STARTED
&& entry.getSnapshots().containsAll(snapshotIds)
)
Expand All @@ -2149,14 +2186,14 @@ public ClusterState execute(ClusterState currentState) {
}
newDelete = new SnapshotDeletionsInProgress.Entry(
List.copyOf(snapshotIdsRequiringCleanup),
repoName,
repositoryName,
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
updatedSnapshots.entries()
.stream()
.filter(entry -> repoName.equals(entry.repository()))
.filter(entry -> repositoryName.equals(entry.repository()))
.noneMatch(SnapshotsService::isWritingToRepository)
&& deletionsInProgress.hasExecutingDeletion(repoName) == false
&& deletionsInProgress.hasExecutingDeletion(repositoryName) == false
? SnapshotDeletionsInProgress.State.STARTED
: SnapshotDeletionsInProgress.State.WAITING
);
Expand Down Expand Up @@ -2193,7 +2230,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
return;
}
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
if (tryEnterRepoLoop(repoName)) {
if (tryEnterRepoLoop(repositoryName)) {
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
} else {
logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
Expand All @@ -2208,52 +2245,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure);
}

private static List<SnapshotId> matchingSnapshotIds(
List<SnapshotId> inProgress,
RepositoryData repositoryData,
String[] snapshotsOrPatterns,
String repositoryName
) {
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
final Set<SnapshotId> foundSnapshots = new HashSet<>(inProgress);
for (String snapshotOrPattern : snapshotsOrPatterns) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
foundSnapshots.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
if (foundId == null) {
if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
}
}
}
return List.copyOf(foundSnapshots);
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
private static List<SnapshotsInProgress.Entry> findInProgressSnapshots(
SnapshotsInProgress snapshots,
String[] snapshotNames,
String repositoryName
) {
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
entries.add(entry);
}
}
return entries;
}

/**
* Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository.
*
Expand Down

0 comments on commit ee66de9

Please sign in to comment.