diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java index 01667f8a48160..aee26a018304e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java @@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } + /** + * Safely parses a string epoch representation to a Long + * + * Commonly this function is used for parsing Date fields from doc values + * requested with the format "epoch_millis". + * + * Since nanosecond support was added epoch_millis timestamps may have a fractional component. + * We discard this, taking just whole milliseconds. Arguably it would be better to retain the + * precision here and let the downstream component decide whether it wants the accuracy, but + * that makes it hard to pass around the value as a number. The double type doesn't have + * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would + * work, but that isn't supported by the JSON parser if the number gets round-tripped through + * JSON. So String is really the only format that could be used, but the consumers of time + * are expecting a number. + * + * @param epoch The epoch value as a string. This may contain a fractional component. + * @return The epoch value. + */ + public static long parseToEpochMs(String epoch) { + int dotPos = epoch.indexOf('.'); + if (dotPos == -1) { + return Long.parseLong(epoch); + } else if (dotPos > 0) { + return Long.parseLong(epoch.substring(0, dotPos)); + } else { + // The first character is '.' so round down to 0 + return 0L; + } + } + /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java index e122202b5fa6c..0dcb245c78006 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java @@ -72,6 +72,12 @@ public void testDateStringToEpoch() { assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500")); } + public void testParseToEpochMs() { + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); + assertEquals(0L, TimeUtils.parseToEpochMs(".005")); + } + public void testCheckMultiple_GivenMultiples() { TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo")); TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java index 24412fe6eb77c..9436dddde78db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java @@ -7,6 +7,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.util.Collections; import java.util.Objects; @@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) { return value; } if (value[0] instanceof String) { // doc_value field with the epoch_millis format - // Since nanosecond support was added epoch_millis timestamps may have a fractional component. - // We discard this, taking just whole milliseconds. Arguably it would be better to retain the - // precision here and let the downstream component decide whether it wants the accuracy, but - // that makes it hard to pass around the value as a number. The double type doesn't have - // enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would - // work, but that isn't supported by the JSON parser if the number gets round-tripped through - // JSON. So String is really the only format that could be used, but the ML consumers of time - // are expecting a number. - String strVal0 = (String) value[0]; - int dotPos = strVal0.indexOf('.'); - if (dotPos == -1) { - value[0] = Long.parseLong(strVal0); - } else if (dotPos > 0) { - value[0] = Long.parseLong(strVal0.substring(0, dotPos)); - } else { - value[0] = 0L; - } + value[0] = TimeUtils.parseToEpochMs((String)value[0]); } else if (value[0] instanceof Long == false) { // pre-6.0 field throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 40611438fda59..301a77be25660 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +25,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -38,8 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; -import java.io.InputStream; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; @@ -85,6 +79,11 @@ public void remove(ActionListener listener, Supplier isTimedOu .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); source.trackTotalHits(true); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); + // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -96,11 +95,9 @@ public void remove(ActionListener listener, Supplier isTimedOu } private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -131,8 +128,8 @@ public void onFailure(Exception e) { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits().value > MAX_FORECASTS) { @@ -140,19 +137,29 @@ private List findForecastsToDelete(SearchResponse searchRe } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (expiryTime == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); } + } + } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); @@ -160,10 +167,12 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -173,4 +182,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec return request; } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 593abd2273d2c..11401d580c8fb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -24,10 +24,10 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -45,7 +45,7 @@ * of their respective job with the exception of the currently used snapshot. * A snapshot is deleted if its timestamp is earlier than the start of the * current day (local time-zone) minus the retention period. - * + *

* This is expected to be used by actions requiring admin rights. Thus, * it is also expected that the provided client will be a client with the * ML origin so that permissions to manage ML indices are met. @@ -55,9 +55,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final Logger LOGGER = LogManager.getLogger(ExpiredModelSnapshotsRemover.class); /** - * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as - * we don't change that in our ML indices. It should be more than enough for most cases. If not, - * it will take a few iterations to delete all snapshots, which is OK. + * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as + * we don't change that in our ML indices. It should be more than enough for most cases. If not, + * it will take a few iterations to delete all snapshots, which is OK. */ private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; @@ -78,31 +78,34 @@ Long getRetentionDays(Job job) { @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestSnapshotTimeStamp(jobId, ActionListener.wrap( - latestTime -> { - if (latestTime == null) { - threadedActionListener.onResponse(null); - } else { - long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); - } - }, - listener::onFailure + latestTime -> { + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } + }, + listener::onFailure )); } private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); QueryBuilder snapshotQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(sortBuilder); searchSourceBuilder.query(snapshotQuery); searchSourceBuilder.size(1); searchSourceBuilder.trackTotalHits(false); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequest searchRequest = new SearchRequest(indexName); @@ -110,17 +113,23 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); client.search(searchRequest, ActionListener.wrap( - response -> { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length == 0) { - // no snapshots found + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no snapshots found + listener.onResponse(null); + } else { + String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId()); listener.onResponse(null); } else { - ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); - listener.onResponse(snapshot.getTimestamp().getTime()); + long timestampMs = TimeUtils.parseToEpochMs(timestamp); + listener.onResponse(timestampMs); } - }, - listener::onFailure) + } + }, + listener::onFailure) ); } @@ -137,17 +146,25 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); + MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); } private ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { @@ -155,11 +172,17 @@ private ActionListener expiredSnapshotsListener(String jobId, Ac @Override public void onResponse(SearchResponse searchResponse) { try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -167,34 +190,48 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); + listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); } }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - try { - deleteModelSnapshots(modelSnapshotIterator, listener); - } catch (Exception e) { - onFailure(e); - } + @Override + public void onResponse(AcknowledgedResponse response) { + try { + deleteModelSnapshots(modelSnapshotIterator, listener); + } catch (Exception e) { + onFailure(e); } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); + } + }); + } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22d..0fa06262801f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 0d8955c05774f..f9e1b4a2c1d93 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -86,6 +86,14 @@ static SearchResponse createSearchResponse(List toXContent return createSearchResponse(toXContents, toXContents.size()); } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), + new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + @SuppressWarnings("unchecked") static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index a178cd48b7cad..b34d9b05fe45a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -13,15 +13,19 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -89,17 +93,19 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + Date now = new Date(); + Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAgo); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -173,16 +179,20 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + Date now = new Date(); + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - List snapshots1JobSnapshots = Arrays.asList( + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); + List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2")); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -201,12 +211,12 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio } @SuppressWarnings("unchecked") - public void testCalcCutoffEpochMs() throws IOException { + public void testCalcCutoffEpochMs() { List searchResponses = new ArrayList<>(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); givenClientRequests(searchResponses, true, true); @@ -219,6 +229,17 @@ public void testCalcCutoffEpochMs() throws IOException { verify(cutoffListener).onResponse(eq(expectedCutoffTime)); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -242,6 +263,15 @@ private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId, Date date) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + String dateAsString = Long.valueOf(date.getTime()).toString(); + hitBuilder.addField(ModelSnapshot.TIMESTAMP.getPreferredName(), Collections.singletonList(dateAsString)); + return hitBuilder.build(); + } + private void givenClientRequestsSucceed(List searchResponses) { givenClientRequests(searchResponses, true, true); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 0000000000000..5b5638a904a99 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +}