Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate results and model snapshot retention using latest bucket timestamps #51061

Merged
merged 14 commits into from
Jan 22, 2020
16 changes: 9 additions & 7 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,10 @@ example, `1575402236000 `.
end::model-snapshot-id[]

tag::model-snapshot-retention-days[]
The time in days that model snapshots are retained for the job. Older snapshots
are deleted. The default value is `1`, which means snapshots are retained for
one day (twenty-four hours).
Advanced configuration option. The period of time (in days) that model snapshots are retained.
will be retained. Age is calculated as the time from the newest model snapshot's
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
timestamp, older snapshots are automatically deleted. The default value is `1`,
which means snapshots are retained for one day (twenty-four hours).
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
end::model-snapshot-retention-days[]

tag::multivariate-by-fields[]
Expand Down Expand Up @@ -963,10 +964,11 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
end::results-index-name[]

tag::results-retention-days[]
Advanced configuration option. The number of days for which job results are
retained. Once per day at 00:30 (server time), results older than this period
are deleted from {es}. The default value is null, which means results are
retained.
Advanced configuration option. When set this option will prune results older
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
than this age in days. Age is calculated as the difference between the latest
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
bucket result's timestamp and the result's timestamp. Once per day at 00:30
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
(server time), results older than this are deleted from {es}. The default
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
value is null, which means all results are retained.
end::results-retention-days[]

tag::retain[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -52,20 +50,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
private static final String DATA_INDEX = "delete-expired-data-test-data";

@Before
public void setUpData() throws IOException {
public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX)
.setMapping("time", "type=date,format=epoch_millis")
.get();

// We are going to create data for last 2 days
long nowMillis = System.currentTimeMillis();
// We are going to create 3 days of data ending 1 hr ago
long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
int totalBuckets = 3 * 24;
int normalRate = 10;
int anomalousRate = 100;
int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
Expand Down Expand Up @@ -120,7 +118,7 @@ public void testDeleteExpiredData() throws Exception {

String datafeedId = job.getId() + "-feed";
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
datafeedConfig.setIndices(Arrays.asList(DATA_INDEX));
datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
DatafeedConfig datafeed = datafeedConfig.build();
registerDatafeed(datafeed);
putDatafeed(datafeed);
Expand Down Expand Up @@ -208,7 +206,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));

List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-retention").size(), equalTo(0));
Expand All @@ -223,7 +221,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));

buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
Expand Down Expand Up @@ -276,7 +274,7 @@ public void testDeleteExpiredData() throws Exception {
private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder();
detector.setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -16,12 +15,9 @@
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -68,30 +64,37 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));

calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
cutoffEpochMs -> {
if (cutoffEpochMs == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
listener::onFailure
));
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
}
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);

protected abstract Long getRetentionDays(Job job);
abstract Long getRetentionDays(Job job);

/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next two methods (removeDataBefore and createQuery) might as well be package private too for consistency with the other abstract methods.

* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);

protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -27,12 +31,14 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Deletes all model snapshots that have expired the configured retention time
Expand Down Expand Up @@ -65,10 +71,59 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa
}

@Override
protected Long getRetentionDays(Job job) {
Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);

latestSnapshotTimeStamp(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(snapshotQuery);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no snapshots found
listener.onResponse(null);
} else {
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
listener.onResponse(snapshot.getTimestamp().getTime());
}
},
listener::onFailure)
);
}

@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
Expand Down Expand Up @@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
}

private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
return new ActionListener<SearchResponse>() {
return new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
Expand Down
Loading