Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate results and model snapshot retention using latest bucket timestamps #51061

Merged
merged 14 commits into from
Jan 22, 2020
16 changes: 9 additions & 7 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,10 @@ example, `1575402236000 `.
end::model-snapshot-id[]

tag::model-snapshot-retention-days[]
The time in days that model snapshots are retained for the job. Older snapshots
are deleted. The default value is `1`, which means snapshots are retained for
one day (twenty-four hours).
Advanced configuration option. Denotes the period for which model snapshots
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szabosteve can you look over these docs changes please. Do they make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidkyle Sorry, I haven't noticed this one earlier.

davidkyle marked this conversation as resolved.
Show resolved Hide resolved
will be retained. Age is calculated as the time from the newest model snapshot's
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
timestamp, older snapshots are automatically deleted. The default value is `1`,
which means snapshots are retained for one day (twenty-four hours).
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
end::model-snapshot-retention-days[]

tag::multivariate-by-fields[]
Expand Down Expand Up @@ -963,10 +964,11 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
end::results-index-name[]

tag::results-retention-days[]
Advanced configuration option. The number of days for which job results are
retained. Once per day at 00:30 (server time), results older than this period
are deleted from {es}. The default value is null, which means results are
retained.
Advanced configuration option. When set this option will prune results older
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
than this age. Age is calculated as the difference between the latest
bucket result's timestamp and the result's timestamp. Once per day at 00:30
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
(server time), results older than this are deleted from {es}. The default
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
value is null, which means all results are retained.
end::results-retention-days[]

tag::retain[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ public void setUpData() throws IOException {
.setMapping("time", "type=date,format=epoch_millis")
.get();

// We are going to create data for last 2 days
long nowMillis = System.currentTimeMillis();
// We are going to create 2 days of data starting 24 hrs ago
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We are going to create 2 days of data starting 24 hrs ago
// We are going to create 3 days of data ending 1 hour ago

long lastestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: lastest -> latest

int totalBuckets = 3 * 24;
int normalRate = 10;
int anomalousRate = 100;
int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
long timestamp = lastestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));

List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-retention").size(), equalTo(0));
Expand All @@ -223,7 +223,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));

buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,29 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));

calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
cutoffEpochMs -> {
if (cutoffEpochMs == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
listener::onFailure
));
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other methods that extending classes are expected to override are protected, even though the actual classes that do extend this class are all in the same package. But then this class's constructor is package private so it would be impossible to have a derived class in another package despite the abstract methods being set up for that. I think for consistency they should all be the same - either protected or package private. Certainly with a default implementation here I think protected makes it clearer that we expect derived classes to modify it rather than it's just been made accessible for testing. But then if you agree with my other suggestion and make this abstract then that also makes that clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is package-private for testing otherwise it is very difficult to test and can only be done indirectly. abstract makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But protected would also allow it to be tested directly. Basically I think all the abstract methods and the constructor should have the same accessibility, whether that be protected or package private. So either change the ones that are currently protected to be package private or change this one plus the constructor to be protected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the base class is package private, only classes in the same package can implement the abstract method whether the method is protected or package private. The only difference is I could derive a new class from one of the package's public non-abstract classes and reimplement calcCutoffEpochMs in a different package if it was protected but not if package private. In practice this isn't a concern so I've gone for the principle of least visibility and made the abstract methods package private.

long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this method be abstract instead of providing a default based on wall clock time? It seems that now we've made deletion of model snapshots and results relative to latest bucket time rather than wall clock time we should do that for all job related documents that have a timestamp. So having a default implementation of this method that uses wall clock time just seems like a way that we'll introduce a bug by accidentally deleting some other type of document based on wall clock time.


protected abstract Long getRetentionDays(Job job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -27,12 +31,14 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Deletes all model snapshots that have expired the configured retention time
Expand Down Expand Up @@ -69,6 +75,55 @@ protected Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);

latestSnapshotTimeStamp(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(snapshotQuery);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no snapshots found
listener.onResponse(null);
} else {
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
listener.onResponse(snapshot.getTimestamp().getTime());
}
},
listener::onFailure)
);
}

@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
Expand Down Expand Up @@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
}

private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
return new ActionListener<SearchResponse>() {
return new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,50 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Removes all results that have expired the configured retention time
Expand All @@ -48,11 +69,13 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;

public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}

@Override
Expand All @@ -65,7 +88,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);

client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
try {
Expand Down Expand Up @@ -107,6 +130,59 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
return request;
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestBucketTime(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestBucketTime(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(bucketType);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no buckets found
listener.onResponse(null);
} else {

try (InputStream stream = hits[0].getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
listener.onResponse(bucket.getTimestamp().getTime());
} catch (IOException e) {
listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
}
}
}, listener::onFailure
));
}

private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {
Instant instant = Instant.ofEpochMilli(cutoffEpochMs);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ static SearchResponse createSearchResponse(List<? extends ToXContent> toXContent
static void givenJobs(Client client, List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);

doAnswer(invocationOnMock -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
doAnswer(invocationOnMock -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is indented more than the line above.

ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
}

Expand Down
Loading