Skip to content

Commit

Permalink
Fix delete_expired_data/nightly maintenance when many model snapshots…
Browse files Browse the repository at this point in the history
… need deleting (#57041) (#57145)

The queries performed by the expired data removers pull back entire documents
when only a few fields are required. For ModelSnapshots in particular this is
a problem as they contain quantiles which may be 100s of KB and the search size
is set to 10,000.

This change makes the search more efficient by only requesting the fields
needed to work out which expired data should be deleted.
  • Loading branch information
davidkyle authored May 27, 2020
1 parent 47e6ee9 commit 78fafab
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
}

/**
* Safely parses a string epoch representation to a Long
*
* Commonly this function is used for parsing Date fields from doc values
* requested with the format "epoch_millis".
*
* Since nanosecond support was added epoch_millis timestamps may have a fractional component.
* We discard this, taking just whole milliseconds. Arguably it would be better to retain the
* precision here and let the downstream component decide whether it wants the accuracy, but
* that makes it hard to pass around the value as a number. The double type doesn't have
* enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
* work, but that isn't supported by the JSON parser if the number gets round-tripped through
* JSON. So String is really the only format that could be used, but the consumers of time
* are expecting a number.
*
* @param epoch The epoch value as a string. This may contain a fractional component.
* @return The epoch value.
*/
public static long parseToEpochMs(String epoch) {
int dotPos = epoch.indexOf('.');
if (dotPos == -1) {
return Long.parseLong(epoch);
} else if (dotPos > 0) {
return Long.parseLong(epoch.substring(0, dotPos));
} else {
// The first character is '.' so round down to 0
return 0L;
}
}

/**
* First tries to parse the date first as a Long and convert that to an
* epoch time. If the long number has more than 10 digits it is considered a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public void testDateStringToEpoch() {
assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500"));
}

public void testParseToEpochMs() {
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000"));
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005"));
assertEquals(0L, TimeUtils.parseToEpochMs(".005"));
}

public void testCheckMultiple_GivenMultiples() {
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.common.time.TimeUtils;

import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) {
return value;
}
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
// precision here and let the downstream component decide whether it wants the accuracy, but
// that makes it hard to pass around the value as a number. The double type doesn't have
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
// JSON. So String is really the only format that could be used, but the ML consumers of time
// are expecting a number.
String strVal0 = (String) value[0];
int dotPos = strVal0.indexOf('.');
if (dotPos == -1) {
value[0] = Long.parseLong(strVal0);
} else if (dotPos > 0) {
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
} else {
value[0] = 0L;
}
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
} else if (value[0] instanceof Long == false) { // pre-6.0 field
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -30,6 +25,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
Expand All @@ -38,8 +34,6 @@
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,6 +79,11 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
source.trackTotalHits(true);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");


// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
Expand All @@ -96,11 +95,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
if (forecastsToDelete.isEmpty()) {
listener.onResponse(true);
return;
}

Expand Down Expand Up @@ -131,39 +128,51 @@ public void onFailure(Exception e) {
});
}

private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
List<JobForecastId> forecastsToDelete = new ArrayList<>();

SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits().value > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}

for (SearchHit hit : hits.getHits()) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
if (expiryTime == null) {
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
if (expiryMs < cutoffEpochMs) {
JobForecastId idPair = new JobForecastId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
forecastsToDelete.add(idPair);
}

}

}
return forecastsToDelete;
}

private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);

request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
for (JobForecastId jobForecastId : ids) {
if (jobForecastId.hasNullValue() == false) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
}
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
Expand All @@ -173,4 +182,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec

return request;
}

private static class JobForecastId {
private final String jobId;
private final String forecastId;

private JobForecastId(String jobId, String forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}

boolean hasNullValue() {
return jobId == null || forecastId == null;
}
}
}
Loading

0 comments on commit 78fafab

Please sign in to comment.