Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SLM eligibility check #100044

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ public SnapshotRetentionTask(
this.historyStore = historyStore;
}

private static String formatSnapshots(Map<String, List<SnapshotInfo>> snapshotMap) {
return snapshotMap.entrySet()
.stream()
.map(
e -> e.getKey() + ": [" + e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(",")) + "]"
)
.collect(Collectors.joining(","));
}

@Override
public void triggered(SchedulerEngine.Event event) {
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID)
Expand Down Expand Up @@ -156,28 +147,9 @@ public void triggered(SchedulerEngine.Event event) {
// Finally, asynchronously retrieve all the snapshots, deleting them serially,
// before updating the cluster state with the new metrics and setting 'running'
// back to false
getAllRetainableSnapshots(repositioriesToFetch, policiesWithRetention.keySet(), new ActionListener<>() {
getSnapshotsEligibleForDeletion(repositioriesToFetch, policiesWithRetention, new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
if (logger.isTraceEnabled()) {
logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots));
}
// Find all the snapshots that are past their retention date
final Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted = allSnapshots.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue()
.stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
// SnapshotInfo instances can be quite large in case they contain e.g. a large collection of
// exceptions so we extract the only two things (id + policy id) here so they can be GCed
.map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo)))
.toList()
)
);

public void onResponse(Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted) {
if (logger.isTraceEnabled()) {
logger.trace("snapshots eligible for deletion: [{}]", snapshotsToBeDeleted);
}
Expand Down Expand Up @@ -256,10 +228,10 @@ static boolean snapshotEligibleForDeletion(
return eligible;
}

void getAllRetainableSnapshots(
void getSnapshotsEligibleForDeletion(
Collection<String> repositories,
Set<String> policies,
ActionListener<Map<String, List<SnapshotInfo>>> listener
Map<String, SnapshotLifecyclePolicy> policies,
ActionListener<Map<String, List<Tuple<SnapshotId, String>>>> listener
) {
if (repositories.isEmpty()) {
// Skip retrieving anything if there are no repositories to fetch
Expand All @@ -273,7 +245,7 @@ void getAllRetainableSnapshots(
// don't time out on this request to not produce failed SLM runs in case of a temporarily slow master node
.setMasterNodeTimeout(TimeValue.MAX_VALUE)
.setIgnoreUnavailable(true)
.setPolicies(policies.toArray(Strings.EMPTY_ARRAY))
.setPolicies(policies.keySet().toArray(Strings.EMPTY_ARRAY))
.setIncludeIndexNames(false)
.execute(ActionListener.wrap(resp -> {
if (logger.isTraceEnabled()) {
Expand All @@ -300,7 +272,39 @@ void getAllRetainableSnapshots(
logger.debug(() -> "unable to retrieve snapshots for [" + repo + "] repositories: ", resp.getFailures().get(repo));
}
}
listener.onResponse(snapshots);

if (logger.isTraceEnabled()) {
logger.trace(
"retrieved snapshots: [{}]",
snapshots.entrySet()
.stream()
.map(
e -> e.getKey()
+ ": ["
+ e.getValue().stream().map(si -> si.snapshotId().getName()).collect(Collectors.joining(","))
+ "]"
)
.collect(Collectors.joining(","))
);
}

// Find all the snapshots that are past their retention date
final Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted = snapshots.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue()
.stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, snapshots, policies))
// SnapshotInfo instances can be quite large in case they contain e.g. a large collection of
// exceptions so we extract the only two things (id + policy id) here so they can be GCed
Comment on lines +300 to +301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ very nice, thanks explaining the why behind it

Should we consider adding support for this kind of filtering at the API level? (perhaps a fields query parameter that specifies only the fields we want in the response?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was moved as-is from the previous code :)

We're going to fix this properly in follow-ups, but there's basically no reasonable way to use the get-snapshots API here so we'll do something else.

.map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo)))
.toList()
)
);

listener.onResponse(snapshotsToBeDeleted);
}, e -> {
logger.debug(() -> "unable to retrieve snapshots for [" + repositories + "] repositories: ", e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand All @@ -42,7 +43,6 @@
import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration;
import org.elasticsearch.xpack.slm.history.SnapshotHistoryStore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -238,20 +238,6 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
0L,
Collections.emptyMap()
);
final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(
new Snapshot(repoId, new SnapshotId("name2", "uuid2")),
Collections.singletonList("index"),
Collections.emptyList(),
Collections.emptyList(),
null,
System.currentTimeMillis() + 1,
1,
Collections.emptyList(),
true,
Collections.singletonMap("policy", policyId),
System.currentTimeMillis(),
Collections.emptyMap()
);

Set<SnapshotId> deleted = ConcurrentHashMap.newKeySet();
Set<String> deletedSnapshotsInHistory = ConcurrentHashMap.newKeySet();
Expand All @@ -273,11 +259,9 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
historyLatch.countDown();
}),
() -> {
List<SnapshotInfo> snaps = new ArrayList<>(2);
snaps.add(eligibleSnapshot);
snaps.add(ineligibleSnapshot);
logger.info("--> retrieving snapshots [{}]", snaps);
return Collections.singletonMap(repoId, snaps);
final var result = Collections.singletonMap(repoId, List.of(Tuple.tuple(eligibleSnapshot.snapshotId(), policyId)));
logger.info("--> retrieving snapshots [{}]", result);
return result;
},
(deletionPolicyId, repo, snapId, slmStats, listener) -> {
logger.info("--> deleting {} from repo {}", snapId, repo);
Expand All @@ -295,7 +279,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
long time = System.currentTimeMillis();
retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time));

deletionLatch.await(10, TimeUnit.SECONDS);
safeAwait(deletionLatch);

assertThat("something should have been deleted", deleted, not(empty()));
assertThat("one snapshot should have been deleted", deleted, hasSize(1));
Expand Down Expand Up @@ -364,18 +348,22 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
);

AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
task.getAllRetainableSnapshots(Collections.singleton(repoId), Collections.singleton(policyId), new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> stringListMap) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}
task.getSnapshotsEligibleForDeletion(
Collections.singleton(repoId),
Map.of(policyId, new SnapshotLifecyclePolicy(policyId, "test", "* * * * *", repoId, null, null)),
new ActionListener<>() {
@Override
public void onResponse(Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}

@Override
public void onFailure(Exception e) {
errHandlerCalled.set(e);
@Override
public void onFailure(Exception e) {
errHandlerCalled.set(e);
}
}
});
);

assertNotNull(errHandlerCalled.get());
assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure"));
Expand Down Expand Up @@ -597,14 +585,14 @@ public ClusterState createState(OperationMode mode, SnapshotLifecyclePolicy... p
}

private static class MockSnapshotRetentionTask extends SnapshotRetentionTask {
private final Supplier<Map<String, List<SnapshotInfo>>> snapshotRetriever;
private final Supplier<Map<String, List<Tuple<SnapshotId, String>>>> snapshotRetriever;
private final DeleteSnapshotMock deleteRunner;

MockSnapshotRetentionTask(
Client client,
ClusterService clusterService,
SnapshotHistoryStore historyStore,
Supplier<Map<String, List<SnapshotInfo>>> snapshotRetriever,
Supplier<Map<String, List<Tuple<SnapshotId, String>>>> snapshotRetriever,
DeleteSnapshotMock deleteRunner,
LongSupplier nanoSupplier
) {
Expand All @@ -614,10 +602,10 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask {
}

@Override
void getAllRetainableSnapshots(
void getSnapshotsEligibleForDeletion(
Collection<String> repositories,
Set<String> policies,
ActionListener<Map<String, List<SnapshotInfo>>> listener
Map<String, SnapshotLifecyclePolicy> policies,
ActionListener<Map<String, List<Tuple<SnapshotId, String>>>> listener
) {
listener.onResponse(this.snapshotRetriever.get());
}
Expand Down