Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.7.1] Fix delete_expired_data/nightly maintenance when many model snapshots… #57145

Merged
merged 1 commit into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
}

/**
* Safely parses a string epoch representation to a Long
*
* Commonly this function is used for parsing Date fields from doc values
* requested with the format "epoch_millis".
*
* Since nanosecond support was added epoch_millis timestamps may have a fractional component.
* We discard this, taking just whole milliseconds. Arguably it would be better to retain the
* precision here and let the downstream component decide whether it wants the accuracy, but
* that makes it hard to pass around the value as a number. The double type doesn't have
* enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
* work, but that isn't supported by the JSON parser if the number gets round-tripped through
* JSON. So String is really the only format that could be used, but the consumers of time
* are expecting a number.
*
* @param epoch The epoch value as a string. This may contain a fractional component.
* @return The epoch value.
*/
public static long parseToEpochMs(String epoch) {
int dotPos = epoch.indexOf('.');
if (dotPos == -1) {
return Long.parseLong(epoch);
} else if (dotPos > 0) {
return Long.parseLong(epoch.substring(0, dotPos));
} else {
// The first character is '.' so round down to 0
return 0L;
}
}

/**
* First tries to parse the date first as a Long and convert that to an
* epoch time. If the long number has more than 10 digits it is considered a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public void testDateStringToEpoch() {
assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500"));
}

public void testParseToEpochMs() {
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000"));
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005"));
assertEquals(0L, TimeUtils.parseToEpochMs(".005"));
}

public void testCheckMultiple_GivenMultiples() {
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo"));
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.common.time.TimeUtils;

import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) {
return value;
}
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
// precision here and let the downstream component decide whether it wants the accuracy, but
// that makes it hard to pass around the value as a number. The double type doesn't have
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
// JSON. So String is really the only format that could be used, but the ML consumers of time
// are expecting a number.
String strVal0 = (String) value[0];
int dotPos = strVal0.indexOf('.');
if (dotPos == -1) {
value[0] = Long.parseLong(strVal0);
} else if (dotPos > 0) {
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
} else {
value[0] = 0L;
}
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
} else if (value[0] instanceof Long == false) { // pre-6.0 field
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -30,6 +25,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
Expand All @@ -38,8 +34,6 @@
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,6 +79,11 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
source.trackTotalHits(true);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");


// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
Expand All @@ -96,11 +95,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
if (forecastsToDelete.isEmpty()) {
listener.onResponse(true);
return;
}

Expand Down Expand Up @@ -131,39 +128,51 @@ public void onFailure(Exception e) {
});
}

private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
List<JobForecastId> forecastsToDelete = new ArrayList<>();

SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits().value > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}

for (SearchHit hit : hits.getHits()) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
if (expiryTime == null) {
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
if (expiryMs < cutoffEpochMs) {
JobForecastId idPair = new JobForecastId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
forecastsToDelete.add(idPair);
}

}

}
return forecastsToDelete;
}

private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);

request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
for (JobForecastId jobForecastId : ids) {
if (jobForecastId.hasNullValue() == false) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
}
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
Expand All @@ -173,4 +182,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec

return request;
}

private static class JobForecastId {
private final String jobId;
private final String forecastId;

private JobForecastId(String jobId, String forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}

boolean hasNullValue() {
return jobId == null || forecastId == null;
}
}
}
Loading