Skip to content

Commit

Permalink
Move SLM eligibility check (elastic#100044)
Browse files Browse the repository at this point in the history
A small refactoring to make elastic#99953 a little simpler: combine the logic
for retrieving the snapshot info and filtering out the ineligible ones
into a single function so we can replace it with a call to a dedicated
client action in a followup.
  • Loading branch information
DaveCTurner authored Sep 29, 2023
1 parent c2072fc commit e23fd32
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ public SnapshotRetentionTask(
this.historyStore = historyStore;
}

private static String formatSnapshots(Map<String, List<SnapshotInfo>> snapshotMap) {
return snapshotMap.entrySet()
.stream()
.map(
e -> e.getKey() + ": [" + e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(",")) + "]"
)
.collect(Collectors.joining(","));
}

@Override
public void triggered(SchedulerEngine.Event event) {
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID)
Expand Down Expand Up @@ -156,28 +147,9 @@ public void triggered(SchedulerEngine.Event event) {
// Finally, asynchronously retrieve all the snapshots, deleting them serially,
// before updating the cluster state with the new metrics and setting 'running'
// back to false
getAllRetainableSnapshots(repositioriesToFetch, policiesWithRetention.keySet(), new ActionListener<>() {
getSnapshotsEligibleForDeletion(repositioriesToFetch, policiesWithRetention, new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
if (logger.isTraceEnabled()) {
logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots));
}
// Find all the snapshots that are past their retention date
final Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted = allSnapshots.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue()
.stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
// SnapshotInfo instances can be quite large in case they contain e.g. a large collection of
// exceptions so we extract the only two things (id + policy id) here so they can be GCed
.map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo)))
.toList()
)
);

public void onResponse(Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted) {
if (logger.isTraceEnabled()) {
logger.trace("snapshots eligible for deletion: [{}]", snapshotsToBeDeleted);
}
Expand Down Expand Up @@ -256,10 +228,10 @@ static boolean snapshotEligibleForDeletion(
return eligible;
}

void getAllRetainableSnapshots(
void getSnapshotsEligibleForDeletion(
Collection<String> repositories,
Set<String> policies,
ActionListener<Map<String, List<SnapshotInfo>>> listener
Map<String, SnapshotLifecyclePolicy> policies,
ActionListener<Map<String, List<Tuple<SnapshotId, String>>>> listener
) {
if (repositories.isEmpty()) {
// Skip retrieving anything if there are no repositories to fetch
Expand All @@ -273,7 +245,7 @@ void getAllRetainableSnapshots(
// don't time out on this request to not produce failed SLM runs in case of a temporarily slow master node
.setMasterNodeTimeout(TimeValue.MAX_VALUE)
.setIgnoreUnavailable(true)
.setPolicies(policies.toArray(Strings.EMPTY_ARRAY))
.setPolicies(policies.keySet().toArray(Strings.EMPTY_ARRAY))
.setIncludeIndexNames(false)
.execute(ActionListener.wrap(resp -> {
if (logger.isTraceEnabled()) {
Expand All @@ -300,7 +272,39 @@ void getAllRetainableSnapshots(
logger.debug(() -> "unable to retrieve snapshots for [" + repo + "] repositories: ", resp.getFailures().get(repo));
}
}
listener.onResponse(snapshots);

if (logger.isTraceEnabled()) {
logger.trace(
"retrieved snapshots: [{}]",
snapshots.entrySet()
.stream()
.map(
e -> e.getKey()
+ ": ["
+ e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(","))
+ "]"
)
.collect(Collectors.joining(","))
);
}

// Find all the snapshots that are past their retention date
final Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted = snapshots.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue()
.stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, snapshots, policies))
// SnapshotInfo instances can be quite large in case they contain e.g. a large collection of
// exceptions so we extract the only two things (id + policy id) here so they can be GCed
.map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo)))
.toList()
)
);

listener.onResponse(snapshotsToBeDeleted);
}, e -> {
logger.debug(() -> "unable to retrieve snapshots for [" + repositories + "] repositories: ", e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand All @@ -42,7 +43,6 @@
import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration;
import org.elasticsearch.xpack.slm.history.SnapshotHistoryStore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -238,20 +238,6 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
0L,
Collections.emptyMap()
);
final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(
new Snapshot(repoId, new SnapshotId("name2", "uuid2")),
Collections.singletonList("index"),
Collections.emptyList(),
Collections.emptyList(),
null,
System.currentTimeMillis() + 1,
1,
Collections.emptyList(),
true,
Collections.singletonMap("policy", policyId),
System.currentTimeMillis(),
Collections.emptyMap()
);

Set<SnapshotId> deleted = ConcurrentHashMap.newKeySet();
Set<String> deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet();
Expand All @@ -273,11 +259,9 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
historyLatch.countDown();
}),
() -> {
List<SnapshotInfo> snaps = new ArrayList<>(2);
snaps.add(eligibleSnapshot);
snaps.add(ineligibleSnapshot);
logger.info("--> retrieving snapshots [{}]", snaps);
return Collections.singletonMap(repoId, snaps);
final var result = Collections.singletonMap(repoId, List.of(Tuple.tuple(eligibleSnapshot.snapshotId(), policyId)));
logger.info("--> retrieving snapshots [{}]", result);
return result;
},
(deletionPolicyId, repo, snapId, slmStats, listener) -> {
logger.info("--> deleting {} from repo {}", snapId, repo);
Expand All @@ -295,7 +279,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
long time = System.currentTimeMillis();
retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time));

deletionLatch.await(10, TimeUnit.SECONDS);
safeAwait(deletionLatch);

assertThat("something should have been deleted", deleted, not(empty()));
assertThat("one snapshot should have been deleted", deleted, hasSize(1));
Expand Down Expand Up @@ -364,18 +348,22 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
);

AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
task.getAllRetainableSnapshots(Collections.singleton(repoId), Collections.singleton(policyId), new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> stringListMap) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}
task.getSnapshotsEligibleForDeletion(
Collections.singleton(repoId),
Map.of(policyId, new SnapshotLifecyclePolicy(policyId, "test", "* * * * *", repoId, null, null)),
new ActionListener<>() {
@Override
public void onResponse(Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}

@Override
public void onFailure(Exception e) {
errHandlerCalled.set(e);
@Override
public void onFailure(Exception e) {
errHandlerCalled.set(e);
}
}
});
);

assertNotNull(errHandlerCalled.get());
assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure"));
Expand Down Expand Up @@ -597,14 +585,14 @@ public ClusterState createState(OperationMode mode, SnapshotLifecyclePolicy... p
}

private static class MockSnapshotRetentionTask extends SnapshotRetentionTask {
private final Supplier<Map<String, List<SnapshotInfo>>> snapshotRetriever;
private final Supplier<Map<String, List<Tuple<SnapshotId, String>>>> snapshotRetriever;
private final DeleteSnapshotMock deleteRunner;

MockSnapshotRetentionTask(
Client client,
ClusterService clusterService,
SnapshotHistoryStore historyStore,
Supplier<Map<String, List<SnapshotInfo>>> snapshotRetriever,
Supplier<Map<String, List<Tuple<SnapshotId, String>>>> snapshotRetriever,
DeleteSnapshotMock deleteRunner,
LongSupplier nanoSupplier
) {
Expand All @@ -614,10 +602,10 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask {
}

@Override
void getAllRetainableSnapshots(
void getSnapshotsEligibleForDeletion(
Collection<String> repositories,
Set<String> policies,
ActionListener<Map<String, List<SnapshotInfo>>> listener
Map<String, SnapshotLifecyclePolicy> policies,
ActionListener<Map<String, List<Tuple<SnapshotId, String>>>> listener
) {
listener.onResponse(this.snapshotRetriever.get());
}
Expand Down

0 comments on commit e23fd32

Please sign in to comment.