Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate results and model snapshot retention using latest bucket timestamps #51061

Merged
merged 14 commits into from
Jan 22, 2020
16 changes: 9 additions & 7 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,10 @@ example, `1575402236000 `.
end::model-snapshot-id[]

tag::model-snapshot-retention-days[]
The time in days that model snapshots are retained for the job. Older snapshots
are deleted. The default value is `1`, which means snapshots are retained for
one day (twenty-four hours).
Advanced configuration option. Denotes the period for which model snapshots
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szabosteve can you look over these docs changes please. Do they make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidkyle Sorry, I haven't noticed this one earlier.

davidkyle marked this conversation as resolved.
Show resolved Hide resolved
will be retained. Age is calculated as the time from the newest model snapshot's
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
timestamp, older snapshots are automatically deleted. The default value is `1`,
which means snapshots are retained for one day (twenty-four hours).
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
end::model-snapshot-retention-days[]

tag::multivariate-by-fields[]
Expand Down Expand Up @@ -963,10 +964,11 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
end::results-index-name[]

tag::results-retention-days[]
Advanced configuration option. The number of days for which job results are
retained. Once per day at 00:30 (server time), results older than this period
are deleted from {es}. The default value is null, which means results are
retained.
Advanced configuration option. When set this option will prune results older
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
than this age. Age is calculated as the difference between the latest
bucket result's timestamp and the result's timestamp. Once per day at 00:30
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
(server time), results older than this are deleted from {es}. The default
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
value is null, which means all results are retained.
end::results-retention-days[]

tag::retain[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ public void setUpData() throws IOException {
.setMapping("time", "type=date,format=epoch_millis")
.get();

// We are going to create data for last 2 days
long nowMillis = System.currentTimeMillis();
// We are going to create 3 days of data starting 1 hr ago
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We are going to create 3 days of data starting 1 hr ago
// We are going to create 3 days of data ending 1 hr ago

long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
int totalBuckets = 3 * 24;
int normalRate = 10;
int anomalousRate = 100;
int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));

List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-retention").size(), equalTo(0));
Expand All @@ -223,7 +223,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));

buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -16,12 +15,9 @@
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -68,22 +64,29 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));

calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
cutoffEpochMs -> {
if (cutoffEpochMs == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
listener::onFailure
));
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
}
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);

protected abstract Long getRetentionDays(Job job);
abstract Long getRetentionDays(Job job);

/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next two methods (removeDataBefore and createQuery) might as well be package private too for consistency with the other abstract methods.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -27,12 +31,14 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Deletes all model snapshots that have expired the configured retention time
Expand Down Expand Up @@ -65,10 +71,59 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa
}

@Override
protected Long getRetentionDays(Job job) {
Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);

latestSnapshotTimeStamp(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(snapshotQuery);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no snapshots found
listener.onResponse(null);
} else {
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
listener.onResponse(snapshot.getTimestamp().getTime());
}
},
listener::onFailure)
);
}

@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
Expand Down Expand Up @@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
}

private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
return new ActionListener<SearchResponse>() {
return new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,50 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Removes all results that have expired the configured retention time
Expand All @@ -48,15 +69,17 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;

public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}

@Override
protected Long getRetentionDays(Job job) {
Long getRetentionDays(Job job) {
return job.getResultsRetentionDays();
}

Expand All @@ -65,7 +88,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);

client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
try {
Expand Down Expand Up @@ -107,6 +130,59 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
return request;
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestBucketTime(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestBucketTime(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(bucketType);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no buckets found
listener.onResponse(null);
} else {

try (InputStream stream = hits[0].getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
listener.onResponse(bucket.getTimestamp().getTime());
} catch (IOException e) {
listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
}
}
}, listener::onFailure
));
}

private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {
Instant instant = Instant.ofEpochMilli(cutoffEpochMs);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());
Expand Down
Loading