From c99021cdcbff4d364adee7208b6204548a7c4c12 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 5 May 2020 12:55:50 +0100 Subject: [PATCH] [ML] More advanced model snapshot retention options (#56125) This PR implements the following changes to make ML model snapshot retention more flexible in advance of adding a UI for the feature in an upcoming release. - The default for `model_snapshot_retention_days` for new jobs is now 10 instead of 1 - There is a new job setting, `daily_model_snapshot_retention_after_days`, that defaults to 1 for new jobs and `model_snapshot_retention_days` for pre-7.8 jobs - For days that are older than `model_snapshot_retention_days`, all model snapshots are deleted as before - For days that are in between `daily_model_snapshot_retention_after_days` and `model_snapshot_retention_days` all but the first model snapshot for that day are deleted - The `retain` setting of model snapshots is still respected to allow selected model snapshots to be retained indefinitely Closes #52150 --- .../client/MachineLearningIT.java | 4 +- .../client/ml/job/config/JobTests.java | 10 +- .../anomaly-detection/apis/get-job.asciidoc | 3 +- .../apis/get-ml-info.asciidoc | 3 +- .../anomaly-detection/apis/put-job.asciidoc | 7 +- .../apis/update-job.asciidoc | 4 + docs/reference/ml/ml-shared.asciidoc | 16 +- .../xpack/core/ml/job/config/Job.java | 43 +++- .../xpack/core/ml/job/messages/Messages.java | 4 + .../ml/job/results/ReservedFieldNames.java | 1 + .../xpack/core/ml/config_index_mappings.json | 3 + .../xpack/core/ml/job/config/JobTests.java | 44 +++- .../core/ml/job/config/JobUpdateTests.java | 37 ++- .../ml/qa/ml-with-security/build.gradle | 1 + .../ml/integration/DeleteExpiredDataIT.java | 23 +- .../integration/ModelSnapshotRetentionIT.java | 234 ++++++++++++++++++ .../ml/action/TransportMlInfoAction.java | 5 +- .../TransportRevertModelSnapshotAction.java | 5 +- .../xpack/ml/job/JobManager.java | 8 +- .../ml/job/persistence/JobConfigProvider.java | 1 - .../AbstractExpiredJobDataRemover.java | 50 +++- .../ExpiredModelSnapshotsRemover.java | 52 ++-- .../job/retention/ExpiredResultsRemover.java | 8 +- .../AbstractExpiredJobDataRemoverTests.java | 9 +- .../ExpiredModelSnapshotsRemoverTests.java | 40 +-- .../retention/ExpiredResultsRemoverTests.java | 4 +- .../rest-api-spec/test/ml/jobs_crud.yml | 58 +++++ .../rest-api-spec/test/ml/ml_info.yml | 15 +- 28 files changed, 593 insertions(+), 99 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index d12482e035455..8fe7a75564b5b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -2519,10 +2519,12 @@ private static Job buildJobForExpiredDataTests(String jobId) { .setFunction("count") .setDetectorDescription(randomAlphaOfLength(10)) .build(); - AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Arrays.asList(detector)); + AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Collections.singletonList(detector)); //should not be random, see:https://github.com/elastic/ml-cpp/issues/208 configBuilder.setBucketSpan(new TimeValue(1, TimeUnit.HOURS)); builder.setAnalysisConfig(configBuilder); + builder.setModelSnapshotRetentionDays(1L); + builder.setDailyModelSnapshotRetentionAfterDays(1L); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index 64b56c65e19a2..dcd7d4005d891 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -141,11 +141,17 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); } + Long modelSnapshotRetentionDays = null; if (randomBoolean()) { - builder.setModelSnapshotRetentionDays(randomNonNegativeLong()); + modelSnapshotRetentionDays = randomNonNegativeLong(); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); } if (randomBoolean()) { - builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (modelSnapshotRetentionDays != null) { + builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, modelSnapshotRetentionDays)); + } else { + builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { builder.setResultsRetentionDays(randomNonNegativeLong()); diff --git a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc index 0eab920cff7ea..d90b4c7164dea 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc @@ -137,7 +137,8 @@ The API returns the following results: "model_plot_config" : { "enabled" : true }, - "model_snapshot_retention_days" : 1, + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1, "custom_settings" : { "created_by" : "ml-module-sample", ... diff --git a/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc index 7d5133b6a9d43..f4dd43312b1d9 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc @@ -102,7 +102,8 @@ This is a possible response: }, "model_memory_limit" : "1gb", "categorization_examples_limit" : 4, - "model_snapshot_retention_days" : 1 + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1 }, "datafeeds" : { "scroll_size" : 1000 diff --git a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc index 8e1033643dcd9..9b0b15b154321 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc @@ -224,6 +224,10 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings] include::{docdir}/ml/ml-shared.asciidoc[tag=data-description] //End data_description +`daily_model_snapshot_retention_after_days`:: +(Optional, long) +include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days] + `description`:: (Optional, string) A description of the job. @@ -320,7 +324,8 @@ When the job is created, you receive the following results: "time_field" : "timestamp", "time_format" : "epoch_ms" }, - "model_snapshot_retention_days" : 1, + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1, "results_index_name" : "shared", "allow_lazy_open" : false } diff --git a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc index 492cff08cb6fb..76610264cb347 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc @@ -82,6 +82,10 @@ close the job, then reopen the job and restart the {dfeed} for the changes to ta (object) include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings] +`daily_model_snapshot_retention_after_days`:: +(long) +include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days] + `description`:: (string) A description of the job. diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index d1941e4cabe74..279daffcb15e6 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -361,6 +361,18 @@ example, it can contain custom URL information as shown in {ml-docs}/ml-configuring-url.html[Adding custom URLs to {ml} results]. end::custom-settings[] +tag::daily-model-snapshot-retention-after-days[] +Advanced configuration option. Specifies a number of days between 0 and the +value of `model_snapshot_retention_days`. After this period of time, only the first +model snapshot per day is retained for this job. Age is calculated relative to +the timestamp of the newest model snapshot. For new jobs, the default value is +`1`, which means that all snapshots are retained for one day. Older snapshots +are thinned out such that only one per day is retained. For jobs that were +created before this setting was available, the default value matches the +`model_snapshot_retention_days` value, which preserves the original behavior +and no thinning out of model snapshots occurs. +end::daily-model-snapshot-retention-after-days[] + tag::data-description[] The data description defines the format of the input data when you send data to the job by using the <> API. Note that when configure @@ -997,8 +1009,8 @@ end::model-snapshot-id[] tag::model-snapshot-retention-days[] Advanced configuration option. The period of time (in days) that model snapshots are retained. Age is calculated relative to the timestamp of the newest model -snapshot. The default value is `1`, which means snapshots that are one day -(twenty-four hours) older than the newest snapshot are deleted. +snapshot. The default value is `10`, which means snapshots that are ten days +older than the newest snapshot are deleted. end::model-snapshot-retention-days[] tag::model-timestamp[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a96c1c63f53c4..fdeebc840858a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -98,7 +98,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO */ public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB); - public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1; + public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 10; + public static final long DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS = 1; private static ObjectParser createParser(boolean ignoreUnknownFields) { ObjectParser parser = new ObjectParser<>("job_details", ignoreUnknownFields, Builder::new); @@ -808,6 +809,10 @@ public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) { return this; } + public Long getModelSnapshotRetentionDays() { + return modelSnapshotRetentionDays; + } + public Builder setDailyModelSnapshotRetentionAfterDays(Long dailyModelSnapshotRetentionAfterDays) { this.dailyModelSnapshotRetentionAfterDays = dailyModelSnapshotRetentionAfterDays; return this; @@ -1043,9 +1048,6 @@ public void validateInputFields() { checkValidBackgroundPersistInterval(); checkValueNotLessThan(0, RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays); - checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays); - checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), - dailyModelSnapshotRetentionAfterDays); checkValueNotLessThan(0, RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays); if (!MlStrings.isValidId(id)) { @@ -1055,6 +1057,8 @@ public void validateInputFields() { throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MlStrings.ID_LENGTH_LIMIT)); } + validateModelSnapshotRetentionSettings(); + validateGroups(); // Results index name not specified in user input means use the default, so is acceptable in this validation @@ -1076,6 +1080,37 @@ public void validateAnalysisLimitsAndSetDefaults(@Nullable ByteSizeValue maxMode AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB); } + /** + * This is meant to be called when a new job is created. + * It sets {@link #dailyModelSnapshotRetentionAfterDays} to the default value if it is not set and the default makes sense. + */ + public void validateModelSnapshotRetentionSettingsAndSetDefaults() { + validateModelSnapshotRetentionSettings(); + if (dailyModelSnapshotRetentionAfterDays == null && + modelSnapshotRetentionDays != null && + modelSnapshotRetentionDays > DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS) { + dailyModelSnapshotRetentionAfterDays = DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS; + } + } + + /** + * Validates that {@link #modelSnapshotRetentionDays} and {@link #dailyModelSnapshotRetentionAfterDays} make sense, + * both individually and in combination. + */ + public void validateModelSnapshotRetentionSettings() { + + checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays); + checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), + dailyModelSnapshotRetentionAfterDays); + + if (modelSnapshotRetentionDays != null && + dailyModelSnapshotRetentionAfterDays != null && + dailyModelSnapshotRetentionAfterDays > modelSnapshotRetentionDays) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT, + dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays)); + } + } + private void validateGroups() { for (String group : this.groups) { if (MlStrings.isValidId(group) == false) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index ecafdfd7e19b0..f39ed85bbb284 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.job.messages; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.job.config.Job; import java.text.MessageFormat; import java.util.Locale; @@ -212,6 +213,9 @@ public final class Messages { "This job would cause a mapping clash with existing field [{0}] - avoid the clash by assigning a dedicated results index"; public static final String JOB_CONFIG_TIME_FIELD_NOT_ALLOWED_IN_ANALYSIS_CONFIG = "data_description.time_field may not be used in the analysis_config"; + public static final String JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT = + "The value of '" + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS + "' [{0}] cannot be greater than '" + + Job.MODEL_SNAPSHOT_RETENTION_DAYS + "' [{1}]"; public static final String JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE = "job and group names must be unique but job [{0}] and group [{0}] have the same name"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 4d155fb4b744f..e6420842193c2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -231,6 +231,7 @@ public final class ReservedFieldNames { Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(), Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(), Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), Job.RESULTS_RETENTION_DAYS.getPreferredName(), Job.MODEL_SNAPSHOT_ID.getPreferredName(), Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index 2ed452e5eede6..6e321d197dd71 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -235,6 +235,9 @@ "type" : "object", "enabled" : false }, + "daily_model_snapshot_retention_after_days" : { + "type" : "long" + }, "data_description" : { "properties" : { "field_delimiter" : { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 40fd084315d85..b02e559f5fec2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -105,7 +105,7 @@ public void testConstructor_GivenEmptyJobConfiguration() { assertNull(job.getModelPlotConfig()); assertNull(job.getRenormalizationWindowDays()); assertNull(job.getBackgroundPersistInterval()); - assertThat(job.getModelSnapshotRetentionDays(), equalTo(1L)); + assertThat(job.getModelSnapshotRetentionDays(), equalTo(10L)); assertNull(job.getDailyModelSnapshotRetentionAfterDays()); assertNull(job.getResultsRetentionDays()); assertNotNull(job.allInputFields()); @@ -168,7 +168,7 @@ public void testEquals_GivenDifferentIds() { Job job1 = builder.build(); builder.setId("bar"); Job job2 = builder.build(); - assertFalse(job1.equals(job2)); + assertNotEquals(job1, job2); } public void testEquals_GivenDifferentRenormalizationWindowDays() { @@ -183,7 +183,7 @@ public void testEquals_GivenDifferentRenormalizationWindowDays() { jobDetails2.setRenormalizationWindowDays(4L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentBackgroundPersistInterval() { @@ -198,7 +198,7 @@ public void testEquals_GivenDifferentBackgroundPersistInterval() { jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L)); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentModelSnapshotRetentionDays() { @@ -213,7 +213,7 @@ public void testEquals_GivenDifferentModelSnapshotRetentionDays() { jobDetails2.setModelSnapshotRetentionDays(8L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentResultsRetentionDays() { @@ -228,7 +228,7 @@ public void testEquals_GivenDifferentResultsRetentionDays() { jobDetails2.setResultsRetentionDays(4L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentCustomSettings() { @@ -240,7 +240,7 @@ public void testEquals_GivenDifferentCustomSettings() { Map customSettings2 = new HashMap<>(); customSettings2.put("key2", "value2"); jobDetails2.setCustomSettings(customSettings2); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } // JobConfigurationTests: @@ -397,6 +397,30 @@ public void testVerify_GivenNegativeModelSnapshotRetentionDays() { assertEquals(errorMessage, e.getMessage()); } + public void testVerify_GivenNegativeDailyModelSnapshotRetentionAfterDays() { + String errorMessage = + Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "daily_model_snapshot_retention_after_days", 0, -1); + Job.Builder builder = buildJobBuilder("foo"); + builder.setDailyModelSnapshotRetentionAfterDays(-1L); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build); + + assertEquals(errorMessage, e.getMessage()); + } + + public void testVerify_GivenInconsistentModelSnapshotRetentionSettings() { + long dailyModelSnapshotRetentionAfterDays = randomLongBetween(1, Long.MAX_VALUE); + long modelSnapshotRetentionDays = randomLongBetween(0, dailyModelSnapshotRetentionAfterDays - 1); + String errorMessage = + Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT, + dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays); + Job.Builder builder = buildJobBuilder("foo"); + builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build); + + assertEquals(errorMessage, e.getMessage()); + } + public void testVerify_GivenLowBackgroundPersistInterval() { String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "background_persist_interval", 3600, 3599); Job.Builder builder = buildJobBuilder("foo"); @@ -628,7 +652,11 @@ public static Job createRandomizedJob() { builder.setModelSnapshotRetentionDays(randomNonNegativeLong()); } if (randomBoolean()) { - builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (builder.getModelSnapshotRetentionDays() != null) { + builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, builder.getModelSnapshotRetentionDays())); + } else { + builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { builder.setResultsRetentionDays(randomNonNegativeLong()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 7b906f6c33c44..fdcb990957f7c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -73,11 +73,31 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (randomBoolean()) { update.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); } - if (randomBoolean()) { - update.setModelSnapshotRetentionDays(randomNonNegativeLong()); + // It's quite complicated to ensure updates of the two model snapshot retention settings are valid: + // - We might be updating both, one or neither. + // - If we update both the values in the update must be consistent. + // - If we update just one then that one must be consistent with the value of the other one in the job that's being updated. + Long maxValidDailyModelSnapshotRetentionAfterDays = (job == null) ? null : job.getModelSnapshotRetentionDays(); + boolean willSetModelSnapshotRetentionDays = randomBoolean(); + boolean willSetDailyModelSnapshotRetentionAfterDays = randomBoolean(); + if (willSetModelSnapshotRetentionDays) { + if (willSetDailyModelSnapshotRetentionAfterDays) { + maxValidDailyModelSnapshotRetentionAfterDays = randomNonNegativeLong(); + update.setModelSnapshotRetentionDays(maxValidDailyModelSnapshotRetentionAfterDays); + } else { + if (job == null || job.getDailyModelSnapshotRetentionAfterDays() == null) { + update.setModelSnapshotRetentionDays(randomNonNegativeLong()); + } else { + update.setModelSnapshotRetentionDays(randomLongBetween(job.getDailyModelSnapshotRetentionAfterDays(), Long.MAX_VALUE)); + } + } } - if (randomBoolean()) { - update.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (willSetDailyModelSnapshotRetentionAfterDays) { + if (maxValidDailyModelSnapshotRetentionAfterDays != null) { + update.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, maxValidDailyModelSnapshotRetentionAfterDays)); + } else { + update.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { update.setResultsRetentionDays(randomNonNegativeLong()); @@ -215,7 +235,10 @@ public void testMergeWithJob() { updateBuilder.setAnalysisLimits(analysisLimits); updateBuilder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); updateBuilder.setResultsRetentionDays(randomNonNegativeLong()); - updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong()); + // The createRandom() method tests the complex interactions between these next two, so this test can always update both + long newModelSnapshotRetentionDays = randomNonNegativeLong(); + updateBuilder.setModelSnapshotRetentionDays(newModelSnapshotRetentionDays); + updateBuilder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, newModelSnapshotRetentionDays)); updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong()); updateBuilder.setCategorizationFilters(categorizationFilters); updateBuilder.setCustomSettings(customSettings); @@ -224,7 +247,7 @@ public void testMergeWithJob() { JobUpdate update = updateBuilder.build(); Job.Builder jobBuilder = new Job.Builder("foo"); - jobBuilder.setGroups(Arrays.asList("group-1")); + jobBuilder.setGroups(Collections.singletonList("group-1")); Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("mlcategory"); Detector.Builder d2 = new Detector.Builder("min", "field"); @@ -281,7 +304,7 @@ public void testIsAutodetectProcessUpdate() { assertTrue(update.isAutodetectProcessUpdate()); update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build(); assertTrue(update.isAutodetectProcessUpdate()); - update = new JobUpdate.Builder("foo").setGroups(Arrays.asList("bar")).build(); + update = new JobUpdate.Builder("foo").setGroups(Collections.singletonList("bar")).build(); assertTrue(update.isAutodetectProcessUpdate()); } diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 4abfdfb2e15b2..84ea1372cb1aa 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -159,6 +159,7 @@ integTest.runner { 'ml/jobs_crud/Test put job after closing results index', 'ml/jobs_crud/Test put job after closing state index', 'ml/jobs_crud/Test put job with inconsistent body/param ids', + 'ml/jobs_crud/Test put job with inconsistent model snapshot settings', 'ml/jobs_crud/Test put job with time field in analysis_config', 'ml/jobs_crud/Test put job with duplicate detector configurations', 'ml/jobs_crud/Test job with categorization_analyzer and categorization_filters', diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index cdd12a8fe10e5..612fe23edbdad 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -109,11 +109,17 @@ public void testDeleteExpiredData() throws Exception { } ActionFuture indexUnusedStateDocsResponse = bulkRequestBuilder.execute(); - registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L)); - registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L)); - registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L)); - registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L)); - registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L)); + // These jobs don't thin out model state; ModelSnapshotRetentionIT tests that + registerJob(newJobBuilder("no-retention") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L)); + registerJob(newJobBuilder("results-retention") + .setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L)); + registerJob(newJobBuilder("snapshots-retention") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); + registerJob(newJobBuilder("snapshots-retention-with-retain") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); + registerJob(newJobBuilder("results-and-snapshots-retention") + .setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); List shortExpiryForecastIds = new ArrayList<>(); @@ -171,9 +177,10 @@ public void testDeleteExpiredData() throws Exception { // Refresh to ensure the snapshot timestamp updates are visible refresh("*"); - // We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds) - // FIXME it would be better to wait for something concrete instead of wait for time to elapse - assertBusy(() -> {}, 1, TimeUnit.SECONDS); + // We need to wait for the clock to tick to a new second to ensure the second time + // around model snapshots will have a different ID (it depends on epoch seconds) + long before = System.currentTimeMillis() / 1000; + assertBusy(() -> assertNotEquals(before, System.currentTimeMillis() / 1000), 1, TimeUnit.SECONDS); for (Job.Builder job : getJobs()) { // Run up to now diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java new file mode 100644 index 0000000000000..bc3afbc338187 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { + + private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis(); + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void testModelSnapshotRetentionNoDailyThinning() throws Exception { + + String jobId = "no-daily-thinning"; + + int numDocsPerSnapshot = randomIntBetween(1, 4); + int numSnapshotsPerDay = randomIntBetween(1, 4); + int modelSnapshotRetentionDays = randomIntBetween(1, 10); + int numPriorDays = randomIntBetween(1, 5); + + createJob(jobId, modelSnapshotRetentionDays, modelSnapshotRetentionDays); + + List expectedModelSnapshotDocIds = new ArrayList<>(); + List expectedModelStateDocIds = new ArrayList<>(); + + long now = System.currentTimeMillis(); + long timeMs = now; + // We add 1 to make the maths easier, because the retention period includes + // the cutoff time, yet is measured from the timestamp of the latest snapshot + int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1; + for (int i = numSnapshotsTotal; i > 0; --i) { + String snapshotId = String.valueOf(i); + createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal); + if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays) { + expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId)); + for (int j = 1; j <= numDocsPerSnapshot; ++j) { + expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j)); + } + } + timeMs -= (MS_IN_DAY / numSnapshotsPerDay); + } + refresh(".ml*"); + + deleteExpiredData(); + + Collections.sort(expectedModelSnapshotDocIds); + Collections.sort(expectedModelStateDocIds); + assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds)); + assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds)); + } + + public void testModelSnapshotRetentionWithDailyThinning() throws Exception { + + String jobId = "with-daily-thinning"; + + int numDocsPerSnapshot = randomIntBetween(1, 4); + int numSnapshotsPerDay = randomIntBetween(1, 4); + int modelSnapshotRetentionDays = randomIntBetween(2, 10); + int numPriorDays = randomIntBetween(1, 5); + int dailyModelSnapshotRetentionAfterDays = randomIntBetween(0, modelSnapshotRetentionDays - 1); + + createJob(jobId, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays); + + List expectedModelSnapshotDocIds = new ArrayList<>(); + List expectedModelStateDocIds = new ArrayList<>(); + + long now = System.currentTimeMillis(); + long timeMs = now; + // We add 1 to make the maths easier, because the retention period includes + // the cutoff time, yet is measured from the timestamp of the latest snapshot + int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1; + for (int i = numSnapshotsTotal; i > 0; --i) { + String snapshotId = String.valueOf(i); + createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal); + // We should retain: + // - Nothing older than modelSnapshotRetentionDays + // - Everything newer than dailyModelSnapshotRetentionAfterDays + // - The first snapshot of each day in between + if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays && + (timeMs >= now - MS_IN_DAY * dailyModelSnapshotRetentionAfterDays || + (now - timeMs) % MS_IN_DAY < MS_IN_DAY / numSnapshotsPerDay)) { + expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId)); + for (int j = 1; j <= numDocsPerSnapshot; ++j) { + expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j)); + } + } + timeMs -= (MS_IN_DAY / numSnapshotsPerDay); + } + refresh(".ml*"); + + deleteExpiredData(); + + Collections.sort(expectedModelSnapshotDocIds); + Collections.sort(expectedModelStateDocIds); + assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds)); + assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds)); + } + + private List getAvailableModelSnapshotDocIds(String jobId) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + QueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + searchRequest.source(new SearchSourceBuilder().query(query).size(10000)); + + return getDocIdsFromSearch(searchRequest); + } + + private List getAvailableModelStateDocIds() { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(AnomalyDetectorsIndex.jobStateIndexPattern()); + searchRequest.source(new SearchSourceBuilder().size(10000)); + + return getDocIdsFromSearch(searchRequest); + } + + private List getDocIdsFromSearch(SearchRequest searchRequest) { + + SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet(); + + List docIds = new ArrayList<>(); + assertThat(searchResponse.getHits(), notNullValue()); + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + docIds.add(searchHit.getId()); + } + Collections.sort(docIds); + return docIds; + } + + private void createJob(String jobId, long modelSnapshotRetentionDays, long dailyModelSnapshotRetentionAfterDays) { + Detector detector = new Detector.Builder("count", null).build(); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector))); + builder.setDataDescription(new DataDescription.Builder()); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); + builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays); + + PutJobAction.Request putJobRequest = new PutJobAction.Request(builder); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + } + + private void createModelSnapshot(String jobId, String snapshotId, Date timestamp, int numDocs, boolean isActive) throws IOException { + persistModelSnapshotDoc(jobId, snapshotId, timestamp, numDocs, isActive); + persistModelStateDocs(jobId, snapshotId, numDocs); + if (isActive) { + JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build(); + UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(jobId, jobUpdate); + client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet(); + } + } + + private void persistModelSnapshotDoc(String jobId, String snapshotId, Date timestamp, int numDocs, + boolean immediateRefresh) throws IOException { + ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder(); + modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs); + + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)); + indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId)); + if (immediateRefresh) { + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + XContentBuilder xContentBuilder = JsonXContent.contentBuilder(); + modelSnapshotBuilder.build().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + indexRequest.source(xContentBuilder); + + IndexResponse indexResponse = client().execute(IndexAction.INSTANCE, indexRequest).actionGet(); + assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED)); + } + + private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { + assertThat(numDocs, greaterThan(0)); + + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 1; i <= numDocs; ++i) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); + indexRequest.id(ModelState.documentId(jobId, snapshotId, i)); + // The exact contents of the model state doesn't matter - we are not going to try and restore it + indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + bulkRequest.add(indexRequest); + } + + BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java index dd55a07ebdce4..56d7d2a655173 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -80,10 +81,12 @@ private boolean upgradeMode() { } private Map anomalyDetectorsDefaults() { - Map defaults = new HashMap<>(); + Map defaults = new LinkedHashMap<>(); defaults.put(AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(), defaultModelMemoryLimit()); defaults.put(AnalysisLimits.CATEGORIZATION_EXAMPLES_LIMIT.getPreferredName(), AnalysisLimits.DEFAULT_CATEGORIZATION_EXAMPLES_LIMIT); defaults.put(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), Job.DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS); + defaults.put(Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), + Job.DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS); try { defaults.put(CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER.getPreferredName(), CategorizationAnalyzerConfig.buildDefaultCategorizationAnalyzer(Collections.emptyList()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index d209060a823b4..7f6c202fa345a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -143,10 +143,7 @@ private ActionListener wrapDeleteOldDataList // acknowledged responses return ActionListener.wrap(response -> { Date deleteAfter = modelSnapshot.getLatestResultTimeStamp(); - logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: " - + modelSnapshot.getLatestResultTimeStamp()); - - logger.info("Deleting results after '" + deleteAfter + "'"); + logger.info("[{}] Removing intervening records after reverting model: deleting results after [{}]", jobId, deleteAfter); JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 4bd96ea4d015b..2b385d1fa011b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -240,10 +240,12 @@ static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegis public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegistry, ClusterState state, ActionListener actionListener) throws IOException { - request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit); - validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry); + Job.Builder jobBuilder = request.getJobBuilder(); + jobBuilder.validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit); + jobBuilder.validateModelSnapshotRetentionSettingsAndSetDefaults(); + validateCategorizationAnalyzer(jobBuilder, analysisRegistry); - Job job = request.getJobBuilder().build(new Date()); + Job job = jobBuilder.build(new Date()); if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.DELIMITED) { deprecationLogger.deprecatedAndMaybeLog("ml_create_job_delimited_data", diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 97873464de643..bef38a8e33fae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -299,7 +299,6 @@ public void onResponse(GetResponse getResponse) { return; } - final long version = getResponse.getVersion(); final long seqNo = getResponse.getSeqNo(); final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 439db5c21a914..0173f5b27d0d6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -18,6 +18,7 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,12 +67,12 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener { - if (cutoffEpochMs == null) { + response -> { + if (response == null) { removeData(jobIterator, listener, isTimedOutSupplier); } else { - removeDataBefore(job, cutoffEpochMs, ActionListener.wrap( - response -> removeData(jobIterator, listener, isTimedOutSupplier), + removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( + r -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); } }, @@ -84,7 +85,7 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); + abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); abstract Long getRetentionDays(Job job); @@ -92,7 +93,7 @@ private WrappedBatchedJobsIterator newJobIterator() { * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener); static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() @@ -106,7 +107,7 @@ static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { * This class abstracts away the logic of pulling one job at a time from * multiple batches. */ - private class WrappedBatchedJobsIterator implements Iterator { + private static class WrappedBatchedJobsIterator implements Iterator { private final BatchedJobsIterator batchedIterator; private VolatileCursorIterator currentBatch; @@ -144,4 +145,39 @@ private VolatileCursorIterator createBatchIteratorFromBatch(Deque(jobs); } } + + /** + * The latest time that cutoffs are measured from is not wall clock time, + * but some other reference point that makes sense for the type of data + * being removed. This class groups the cutoff time with it's "latest" + * reference point. + */ + protected static final class CutoffDetails { + + public final long latestTimeMs; + public final long cutoffEpochMs; + + public CutoffDetails(long latestTimeMs, long cutoffEpochMs) { + this.latestTimeMs = latestTimeMs; + this.cutoffEpochMs = cutoffEpochMs; + } + + @Override + public int hashCode() { + return Objects.hash(latestTimeMs, cutoffEpochMs); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof CutoffDetails == false) { + return false; + } + CutoffDetails that = (CutoffDetails) other; + return this.latestTimeMs == that.latestTimeMs && + this.cutoffEpochMs == that.cutoffEpochMs; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index e76f1c7b13052..d6808af8bd1af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -54,6 +53,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final Logger LOGGER = LogManager.getLogger(ExpiredModelSnapshotsRemover.class); + private static final long MS_IN_ONE_DAY = TimeValue.timeValueDays(1).getMillis(); + /** * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as * we don't change that in our ML indices. It should be more than enough for most cases. If not, @@ -72,12 +73,19 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa @Override Long getRetentionDays(Job job) { - return job.getModelSnapshotRetentionDays(); + // If a daily retention cutoff is set then we need to tell the base class that this is the cutoff + // point so that we get to consider deleting model snapshots older than this. Later on we will + // not actually delete all of the ones in between the hard cutoff and the daily retention cutoff. + Long retentionDaysForConsideration = job.getDailyModelSnapshotRetentionAfterDays(); + if (retentionDaysForConsideration == null) { + retentionDaysForConsideration = job.getModelSnapshotRetentionDays(); + } + return retentionDaysForConsideration; } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestSnapshotTimeStamp(jobId, ActionListener.wrap( @@ -86,7 +94,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li threadedActionListener.onResponse(null); } else { long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff)); } }, listener::onFailure @@ -125,39 +133,53 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + // TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss if (job.getModelSnapshotId() == null) { // No snapshot to remove listener.onResponse(true); return; } - LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); + LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery( - ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); + ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) - .mustNot(activeSnapshotFilter) - .mustNot(retainFilter); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .mustNot(activeSnapshotFilter) + .mustNot(retainFilter); - searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC)); + searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE) + .sort(ModelSnapshot.TIMESTAMP.getPreferredName())); + long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) + ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); + MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false)); } - private ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { + private ActionListener expiredSnapshotsListener(String jobId, long deleteAllBeforeMs, + ActionListener listener) { return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { + long nextToKeepMs = deleteAllBeforeMs; try { List modelSnapshots = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef()); + long timestampMs = modelSnapshot.getTimestamp().getTime(); + if (timestampMs >= nextToKeepMs) { + do { + nextToKeepMs += MS_IN_ONE_DAY; + } while (timestampMs >= nextToKeepMs); + continue; + } + modelSnapshots.add(modelSnapshot); } deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); } catch (Exception e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 16ba77c582e8f..54b0d1f54753f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -84,7 +84,7 @@ Long getRetentionDays(Job job) { } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); @@ -131,8 +131,8 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestBucketTime(jobId, ActionListener.wrap( latestTime -> { @@ -140,7 +140,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li threadedActionListener.onResponse(null); } else { long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff)); } }, listener::onFailure diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 0d8955c05774f..8dc5bec70fbe5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -47,7 +47,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { // We can't test an abstract class so make a concrete class // as simple as possible - private class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover { + private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover { private int getRetentionDaysCallCount = 0; @@ -62,13 +62,14 @@ protected Long getRetentionDays(Job job) { return randomBoolean() ? null : 0L; } - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); + listener.onResponse(new CutoffDetails(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis())); } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { listener.onResponse(Boolean.TRUE); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index a178cd48b7cad..f48ad5a7e1225 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -89,15 +89,17 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + Date now = new Date(); + Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); - Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); + ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); + ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); @@ -127,9 +129,10 @@ public void testRemove_GivenTimeout() throws IOException { JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + Date now = new Date(); + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1", now), + createModelSnapshot("snapshots-1", "snapshots-1_2", now)); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1", now)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); @@ -173,15 +176,19 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); + Date now = new Date(); + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2")); + createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); + ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); @@ -197,7 +204,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); } @SuppressWarnings("unchecked") @@ -211,12 +218,12 @@ public void testCalcCutoffEpochMs() throws IOException { givenClientRequests(searchResponses, true, true); long retentionDays = 3L; - ActionListener cutoffListener = mock(ActionListener.class); + ActionListener cutoffListener = mock(ActionListener.class); createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays); - verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { @@ -234,10 +241,6 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); } - private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { - return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build(); - } - private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } @@ -269,7 +272,8 @@ public Void answer(InvocationOnMock invocationOnMock) { capturedSearchRequests.add(searchRequest); // Only the last search request should fail if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { - listener.onResponse(searchResponses.get(callCount.getAndIncrement())); + SearchResponse response = searchResponses.get(callCount.getAndIncrement()); + listener.onResponse(response); } else { listener.onFailure(new RuntimeException("search failed")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 29c8dad0c668c..b1691baca0c79 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -145,12 +145,12 @@ public void testCalcCutoffEpochMs() { givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()), new Bucket(jobId, latest, 60)); - ActionListener cutoffListener = mock(ActionListener.class); + ActionListener cutoffListener = mock(ActionListener.class); createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = latest.getTime() - dayInMills; - verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime))); } private void givenDBQRequestsSucceed() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index af6f349930193..b5cfa75a5c2c0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -172,6 +172,44 @@ ml.get_jobs: job_id: "non-existing" +--- +"Test put job with inconsistent model snapshot settings": + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[4\] cannot be greater than model_snapshot_retention_days \[3\]/ + ml.put_job: + job_id: inconsistent-snapshot-settings-1 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"@timestamp" + }, + "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": 4 + } + + - do: + catch: /daily_model_snapshot_retention_after_days cannot be less than 0. Value = -1/ + ml.put_job: + job_id: inconsistent-snapshot-settings-2 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"@timestamp" + }, + "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": -1 + } + --- "Test put job with inconsistent body/param ids": - do: @@ -297,6 +335,7 @@ "renormalization_window_days": 1, "background_persist_interval": "2h", "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": 2, "results_retention_days": 4, "custom_settings": { "setting1": "custom1", @@ -363,6 +402,7 @@ - match: { renormalization_window_days: 10 } - match: { background_persist_interval: "3h" } - match: { model_snapshot_retention_days: 30 } + - match: { daily_model_snapshot_retention_after_days: 2 } - match: { results_retention_days: 40 } - do: @@ -403,6 +443,24 @@ } - match: { analysis_limits.model_memory_limit: "15mb" } + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[31\] cannot be greater than model_snapshot_retention_days \[30\]/ + ml.update_job: + job_id: jobs-crud-update-job + body: > + { + "daily_model_snapshot_retention_after_days": 31 + } + + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[2\] cannot be greater than model_snapshot_retention_days \[1\]/ + ml.update_job: + job_id: jobs-crud-update-job + body: > + { + "model_snapshot_retention_days": 1 + } + - do: catch: bad_request ml.update_job: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml index d3e3b00223464..3478a0bc8f3c7 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml @@ -13,7 +13,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - is_false: limits.max_model_memory_limit # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -31,7 +32,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "512mb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "512mb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -49,7 +51,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "6gb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -67,7 +70,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "6gb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -85,7 +89,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1mb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "1mb" } # This time we can assert an exact value for the next one because the hard limit is so low