From 6d77148552d0d3b66f009042d449c9fa9f8b0b22 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 3 Jan 2019 17:16:20 +0000 Subject: [PATCH] [ML] Uplift model memory limit on job migration When a 6.1-6.3 job is opened in a later version we increase the model memory limit by 30% if it's below 0.5GB. The migration of jobs from cluster state to the config index changes the job version, so we need to also do this uplift as part of that config migration. Relates #36961 --- .../xpack/ml/MlConfigMigrator.java | 15 ++++++- .../ml/action/TransportOpenJobAction.java | 41 +------------------ 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index f2fe80f377649..184ee44cf376c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -403,11 +404,23 @@ public static Job updateJobForMigration(Job job) { Map custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings()); custom.put(MIGRATED_FROM_VERSION, job.getJobVersion()); builder.setCustomSettings(custom); + // Increase the model memory limit for 6.1 - 6.3 jobs + Version jobVersion = job.getJobVersion(); + if (jobVersion != null && jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0)) { + // Increase model memory limit if < 512MB + if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && + job.getAnalysisLimits().getModelMemoryLimit() < 512L) { + long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); + AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, + job.getAnalysisLimits().getCategorizationExamplesLimit()); + builder.setAnalysisLimits(limits); + } + } // Pre v5.5 (ml beta) jobs do not have a version. // These jobs cannot be opened, we rely on the missing version // to indicate this. // See TransportOpenJobAction.validate() - if (job.getJobVersion() != null) { + if (jobVersion != null) { builder.setJobVersion(Version.CURRENT); } return builder.build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 21f97cbb5dc99..fad24247834d5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -55,9 +55,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -540,50 +537,16 @@ public void onFailure(Exception e) { ); // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks - ActionListener jobUpdateListener = ActionListener.wrap( + ActionListener jobUpdateListener = ActionListener.wrap( response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener), listener::onFailure ); - // Increase the model memory limit for 6.1 - 6.3 jobs - ActionListener missingMappingsListener = ActionListener.wrap( - response -> { - Job job = jobParams.getJob(); - if (job != null) { - Version jobVersion = job.getJobVersion(); - if (jobVersion != null && - (jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) { - // Increase model memory limit if < 512MB - if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && - job.getAnalysisLimits().getModelMemoryLimit() < 512L) { - - long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); - AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, - job.getAnalysisLimits().getCategorizationExamplesLimit()); - - JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT) - .setAnalysisLimits(limits).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - jobUpdateListener); - } else { - jobUpdateListener.onResponse(null); - } - } - else { - jobUpdateListener.onResponse(null); - } - } else { - jobUpdateListener.onResponse(null); - } - }, listener::onFailure - ); - // Try adding state doc mapping ActionListener resultsPutMappingHandler = ActionListener.wrap( response -> { addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, - state, missingMappingsListener); + state, jobUpdateListener); }, listener::onFailure );