Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] More advanced model snapshot retention options #56125

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -19,6 +18,7 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -71,9 +71,7 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
if (response == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
long latestTimeMs = response.v1();
long cutoffEpochMs = response.v2();
removeDataBefore(job, latestTimeMs, cutoffEpochMs, ActionListener.wrap(
removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap(
r -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
Expand All @@ -87,7 +85,7 @@ private WrappedBatchedJobsIterator newJobIterator() {
return new WrappedBatchedJobsIterator(jobsIterator);
}

abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Tuple<Long, Long>> listener);
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener);

abstract Long getRetentionDays(Job job);

Expand All @@ -109,7 +107,7 @@ static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private class WrappedBatchedJobsIterator implements Iterator<Job> {
private static class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;

Expand Down Expand Up @@ -147,4 +145,39 @@ private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Build
return new VolatileCursorIterator<>(jobs);
}
}

/**
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {

public long latestTimeMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make those two members final?

public long cutoffEpochMs;

public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}

@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -85,8 +84,8 @@ Long getRetentionDays(Job job) {
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Tuple<Long, Long>> listener) {
ThreadedActionListener<Tuple<Long, Long>> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
ThreadedActionListener<CutoffDetails> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);

latestSnapshotTimeStamp(jobId, ActionListener.wrap(
Expand All @@ -95,7 +94,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Tuple<Lo
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(new Tuple<>(latestTime, cutoff));
threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff));
}
},
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -132,16 +131,16 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Tuple<Long, Long>> listener) {
ThreadedActionListener<Tuple<Long, Long>> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
ThreadedActionListener<CutoffDetails> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestBucketTime(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(new Tuple<>(latestTime, cutoff));
threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff));
}
},
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -64,9 +63,9 @@ protected Long getRetentionDays(Job job) {
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Tuple<Long, Long>> listener) {
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
listener.onResponse(new Tuple<>(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()));
listener.onResponse(new CutoffDetails(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -219,12 +218,12 @@ public void testCalcCutoffEpochMs() throws IOException {
givenClientRequests(searchResponses, true, true);

long retentionDays = 3L;
ActionListener<Tuple<Long, Long>> cutoffListener = mock(ActionListener.class);
ActionListener<AbstractExpiredJobDataRemover.CutoffDetails> cutoffListener = mock(ActionListener.class);
createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener);

long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays);
verify(cutoffListener).onResponse(eq(new Tuple<>(oneDayAgo.getTime(), expectedCutoffTime)));
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime)));
}

private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand Down Expand Up @@ -146,12 +145,12 @@ public void testCalcCutoffEpochMs() {
givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()),
new Bucket(jobId, latest, 60));

ActionListener<Tuple<Long, Long>> cutoffListener = mock(ActionListener.class);
ActionListener<AbstractExpiredJobDataRemover.CutoffDetails> cutoffListener = mock(ActionListener.class);
createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener);

long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = latest.getTime() - dayInMills;
verify(cutoffListener).onResponse(eq(new Tuple<>(latest.getTime(), expectedCutoffTime)));
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime)));
}

private void givenDBQRequestsSucceed() {
Expand Down