diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 475c85824de18..bd01c66184908 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1334,84 +1334,74 @@ private void triggerSnapshotsPendingDeletions(final ClusterState state) { for (SnapshotDeletionsPending.Entry snapshot : snapshotDeletionsPending.entries()) { final SnapshotId snapshotId = snapshot.getSnapshotId(); + boolean triggered = false; + + if (currentRestores.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being restored, waiting for restore to complete", snapshotId); + continue; + } else if (currentClones.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being cloned, waiting for cloning to complete", snapshotId); + continue; + } else if (currentDeletions.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is already queued", snapshotId); + continue; + } + + Optional optionalRepository; + if (RepositoryData.MISSING_UUID.equals(snapshot.getRepositoryUuid()) == false) { + // the snapshot waiting to be deleted references a repository with a known uuid, + // let's try to find this repository among the existing ones first + optionalRepository = repositories.repositories() + .stream() + .filter(repo -> Objects.equals(repo.uuid(), snapshot.getRepositoryUuid())) + .findFirst(); + if (optionalRepository.isEmpty()) { + // there is no existing repository matching the uuid, + // let's try to find the repository by name among the existing ones that have no uuid + optionalRepository = repositories.repositories() + .stream() + .filter(repo -> Objects.equals(repo.uuid(), RepositoryData.MISSING_UUID)) + .filter(repo -> Objects.equals(repo.name(), snapshot.getRepositoryName())) + .findFirst(); + } + } else { + // the snapshot waiting to be deleted does not references a repository with a known uuid, + // let's try to find the repository by name among the existing ones, in the hope that + // the snapshot will be found there. + optionalRepository = repositories.repositories() + .stream() + .filter(repo -> Objects.equals(repo.name(), snapshot.getRepositoryName())) + .findFirst(); + } - // early add to avoid doing too much work on successive cluster state updates - if (ongoingSnapshotsDeletions.add(snapshotId)) { - boolean triggered = false; - try { - if (currentRestores.contains(snapshotId)) { - logger.trace("snapshot to delete [{}] is being restored, waiting for restore to complete", snapshotId); - continue; - } else if (currentClones.contains(snapshotId)) { - logger.trace("snapshot to delete [{}] is being cloned, waiting for cloning to complete", snapshotId); - continue; - } else if (currentDeletions.contains(snapshotId)) { - logger.trace("snapshot to delete [{}] is already queued", snapshotId); - continue; - } - - Optional optionalRepository; - if (RepositoryData.MISSING_UUID.equals(snapshot.getRepositoryUuid()) == false) { - // the snapshot waiting to be deleted references a repository with a known uuid, - // let's try to find this repository among the existing ones first - optionalRepository = repositories.repositories() - .stream() - .filter(repo -> Objects.equals(repo.uuid(), snapshot.getRepositoryUuid())) - .findFirst(); - if (optionalRepository.isEmpty()) { - // there is no existing repository matching the uuid, - // let's try to find the repository by name among the existing ones that have no uuid - optionalRepository = repositories.repositories() - .stream() - .filter(repo -> Objects.equals(repo.uuid(), RepositoryData.MISSING_UUID)) - .filter(repo -> Objects.equals(repo.name(), snapshot.getRepositoryName())) - .findFirst(); - } - } else { - // the snapshot waiting to be deleted does not references a repository with a known uuid, - // let's try to find the repository by name among the existing ones, in the hope that - // the snapshot will be found there. - optionalRepository = repositories.repositories() - .stream() - .filter(repo -> Objects.equals(repo.name(), snapshot.getRepositoryName())) - .findFirst(); - } - - if (optionalRepository.isEmpty()) { - logger.debug( - "repository [{}/{}] not found, cannot delete pending snapshot [{}] created at {}", - snapshot.getRepositoryName(), - snapshot.getRepositoryUuid(), - snapshotId, - Instant.ofEpochMilli(snapshot.getCreationTime()).atZone(ZoneOffset.UTC) - ); - continue; - } - - final RepositoryMetadata repository = optionalRepository.get(); - if (repository.settings().getAsBoolean(READONLY_SETTING_KEY, false)) { - logger.debug( - "repository [{}/{}] is read-only, cannot delete pending snapshot [{}] created at {}", - repository.name(), - repository.uuid(), - snapshotId, - Instant.ofEpochMilli(snapshot.getCreationTime()).atZone(ZoneOffset.UTC) - ); - continue; - } + if (optionalRepository.isEmpty()) { + logger.debug( + "repository [{}/{}] not found, cannot delete pending snapshot [{}] created at {}", + snapshot.getRepositoryName(), + snapshot.getRepositoryUuid(), + snapshotId, + Instant.ofEpochMilli(snapshot.getCreationTime()).atZone(ZoneOffset.UTC) + ); + continue; + } - // should we add some throttling to not always retry - final boolean added = snapshotsToDelete.computeIfAbsent(repository, r -> new HashSet<>()).add(snapshotId); - assert ongoingSnapshotsDeletions.contains(snapshotId) : snapshotId; - assert added : snapshotId; + final RepositoryMetadata repository = optionalRepository.get(); + if (repository.settings().getAsBoolean(READONLY_SETTING_KEY, false)) { + logger.debug( + "repository [{}/{}] is read-only, cannot delete pending snapshot [{}] created at {}", + repository.name(), + repository.uuid(), + snapshotId, + Instant.ofEpochMilli(snapshot.getCreationTime()).atZone(ZoneOffset.UTC) + ); + continue; + } - logger.trace("triggering snapshot deletion for [{}]", snapshotId); - triggered = true; - } finally { - if (triggered == false) { - ongoingSnapshotsDeletions.remove(snapshotId); - } - } + // should we add some throttling to not always retry? + if (ongoingSnapshotsDeletions.add(snapshotId)) { + logger.trace("triggering snapshot deletion for [{}]", snapshotId); + final boolean added = snapshotsToDelete.computeIfAbsent(repository, r -> new HashSet<>()).add(snapshotId); + assert added : snapshotId; } } snapshotsToDelete.forEach( @@ -1519,12 +1509,12 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - ongoingSnapshotsDeletions.removeAll(missingSnapshots); + missingSnapshots.forEach(ongoingSnapshotsDeletions::remove); } @Override public void onFailure(String source, Exception e) { - ongoingSnapshotsDeletions.removeAll(missingSnapshots); + missingSnapshots.forEach(ongoingSnapshotsDeletions::remove); } } ); @@ -2780,9 +2770,7 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres @Nullable @Override protected SnapshotDeletionsPending filterPendingDeletions(@Nullable SnapshotDeletionsPending pendingDeletions) { - return pendingDeletions != null - ? pendingDeletions.withRemovedSnapshots(deleteEntry.getSnapshots()) - : null; + return pendingDeletions != null ? pendingDeletions.withRemovedSnapshots(deleteEntry.getSnapshots()) : null; } @Override