diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index ac1b2f30ec06d..708621ee4a6c8 100644 --- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -91,15 +91,6 @@ public SnapshotRetentionTask( this.historyStore = historyStore; } - private static String formatSnapshots(Map> snapshotMap) { - return snapshotMap.entrySet() - .stream() - .map( - e -> e.getKey() + ": [" + e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(",")) + "]" - ) - .collect(Collectors.joining(",")); - } - @Override public void triggered(SchedulerEngine.Event event) { assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) @@ -156,28 +147,9 @@ public void triggered(SchedulerEngine.Event event) { // Finally, asynchronously retrieve all the snapshots, deleting them serially, // before updating the cluster state with the new metrics and setting 'running' // back to false - getAllRetainableSnapshots(repositioriesToFetch, policiesWithRetention.keySet(), new ActionListener<>() { + getSnapshotsEligibleForDeletion(repositioriesToFetch, policiesWithRetention, new ActionListener<>() { @Override - public void onResponse(Map> allSnapshots) { - if (logger.isTraceEnabled()) { - logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots)); - } - // Find all the snapshots that are past their retention date - final Map>> snapshotsToBeDeleted = allSnapshots.entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue() - .stream() - .filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention)) - // SnapshotInfo instances can be quite large in case they contain e.g. a large collection of - // exceptions so we extract the only two things (id + policy id) here so they can be GCed - .map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo))) - .toList() - ) - ); - + public void onResponse(Map>> snapshotsToBeDeleted) { if (logger.isTraceEnabled()) { logger.trace("snapshots eligible for deletion: [{}]", snapshotsToBeDeleted); } @@ -256,10 +228,10 @@ static boolean snapshotEligibleForDeletion( return eligible; } - void getAllRetainableSnapshots( + void getSnapshotsEligibleForDeletion( Collection repositories, - Set policies, - ActionListener>> listener + Map policies, + ActionListener>>> listener ) { if (repositories.isEmpty()) { // Skip retrieving anything if there are no repositories to fetch @@ -273,7 +245,7 @@ void getAllRetainableSnapshots( // don't time out on this request to not produce failed SLM runs in case of a temporarily slow master node .setMasterNodeTimeout(TimeValue.MAX_VALUE) .setIgnoreUnavailable(true) - .setPolicies(policies.toArray(Strings.EMPTY_ARRAY)) + .setPolicies(policies.keySet().toArray(Strings.EMPTY_ARRAY)) .setIncludeIndexNames(false) .execute(ActionListener.wrap(resp -> { if (logger.isTraceEnabled()) { @@ -300,7 +272,39 @@ void getAllRetainableSnapshots( logger.debug(() -> "unable to retrieve snapshots for [" + repo + "] repositories: ", resp.getFailures().get(repo)); } } - listener.onResponse(snapshots); + + if (logger.isTraceEnabled()) { + logger.trace( + "retrieved snapshots: [{}]", + snapshots.entrySet() + .stream() + .map( + e -> e.getKey() + + ": [" + + e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(",")) + + "]" + ) + .collect(Collectors.joining(",")) + ); + } + + // Find all the snapshots that are past their retention date + final Map>> snapshotsToBeDeleted = snapshots.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue() + .stream() + .filter(snapshot -> snapshotEligibleForDeletion(snapshot, snapshots, policies)) + // SnapshotInfo instances can be quite large in case they contain e.g. a large collection of + // exceptions so we extract the only two things (id + policy id) here so they can be GCed + .map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo))) + .toList() + ) + ); + + listener.onResponse(snapshotsToBeDeleted); }, e -> { logger.debug(() -> "unable to retrieve snapshots for [" + repositories + "] repositories: ", e); listener.onFailure(e); diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 15badabf3689a..b120b49c63654 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -42,7 +43,6 @@ import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; import org.elasticsearch.xpack.slm.history.SnapshotHistoryStore; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -238,20 +238,6 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { 0L, Collections.emptyMap() ); - final SnapshotInfo ineligibleSnapshot = new SnapshotInfo( - new Snapshot(repoId, new SnapshotId("name2", "uuid2")), - Collections.singletonList("index"), - Collections.emptyList(), - Collections.emptyList(), - null, - System.currentTimeMillis() + 1, - 1, - Collections.emptyList(), - true, - Collections.singletonMap("policy", policyId), - System.currentTimeMillis(), - Collections.emptyMap() - ); Set deleted = ConcurrentHashMap.newKeySet(); Set deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet(); @@ -273,11 +259,9 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { historyLatch.countDown(); }), () -> { - List snaps = new ArrayList<>(2); - snaps.add(eligibleSnapshot); - snaps.add(ineligibleSnapshot); - logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap(repoId, snaps); + final var result = Collections.singletonMap(repoId, List.of(Tuple.tuple(eligibleSnapshot.snapshotId(), policyId))); + logger.info("--> retrieving snapshots [{}]", result); + return result; }, (deletionPolicyId, repo, snapId, slmStats, listener) -> { logger.info("--> deleting {} from repo {}", snapId, repo); @@ -295,7 +279,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - deletionLatch.await(10, TimeUnit.SECONDS); + safeAwait(deletionLatch); assertThat("something should have been deleted", deleted, not(empty())); assertThat("one snapshot should have been deleted", deleted, hasSize(1)); @@ -364,18 +348,22 @@ protected void ); AtomicReference errHandlerCalled = new AtomicReference<>(null); - task.getAllRetainableSnapshots(Collections.singleton(repoId), Collections.singleton(policyId), new ActionListener<>() { - @Override - public void onResponse(Map> stringListMap) { - logger.info("--> forcing failure"); - throw new ElasticsearchException("forced failure"); - } + task.getSnapshotsEligibleForDeletion( + Collections.singleton(repoId), + Map.of(policyId, new SnapshotLifecyclePolicy(policyId, "test", "* * * * *", repoId, null, null)), + new ActionListener<>() { + @Override + public void onResponse(Map>> snapshotsToBeDeleted) { + logger.info("--> forcing failure"); + throw new ElasticsearchException("forced failure"); + } - @Override - public void onFailure(Exception e) { - errHandlerCalled.set(e); + @Override + public void onFailure(Exception e) { + errHandlerCalled.set(e); + } } - }); + ); assertNotNull(errHandlerCalled.get()); assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure")); @@ -597,14 +585,14 @@ public ClusterState createState(OperationMode mode, SnapshotLifecyclePolicy... p } private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; + private final Supplier>>> snapshotRetriever; private final DeleteSnapshotMock deleteRunner; MockSnapshotRetentionTask( Client client, ClusterService clusterService, SnapshotHistoryStore historyStore, - Supplier>> snapshotRetriever, + Supplier>>> snapshotRetriever, DeleteSnapshotMock deleteRunner, LongSupplier nanoSupplier ) { @@ -614,10 +602,10 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { } @Override - void getAllRetainableSnapshots( + void getSnapshotsEligibleForDeletion( Collection repositories, - Set policies, - ActionListener>> listener + Map policies, + ActionListener>>> listener ) { listener.onResponse(this.snapshotRetriever.get()); }