From acd5ddd27930da1c96934b1ccb3055de093034e0 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 27 Sep 2018 10:10:30 +0100 Subject: [PATCH] [ML] Allocate jobs based on JobParams rather than cluster state config (#33994) --- .../xpack/core/XPackClientPlugin.java | 9 +- .../elasticsearch/xpack/core/ml/MlTasks.java | 51 +++- .../xpack/core/ml/action/OpenJobAction.java | 31 ++- .../core/ml/action/StartDatafeedAction.java | 6 +- .../xpack/core/ml/datafeed/DatafeedState.java | 4 +- .../core/ml/job/config/JobTaskState.java | 4 +- .../xpack/core/ml/job/config/JobUpdate.java | 41 ++- .../xpack/core}/ml/MlTasksTests.java | 30 ++- .../core/ml/job/config/JobUpdateTests.java | 3 + .../MlNativeAutodetectIntegTestCase.java | 5 +- .../xpack/ml/MachineLearning.java | 3 + .../xpack/ml/MlAssignmentNotifier.java | 5 +- .../ml/action/TransportOpenJobAction.java | 224 +++++++--------- .../action/TransportStartDatafeedAction.java | 4 +- .../ml/job/persistence/JobConfigProvider.java | 48 ++++ .../xpack/ml/MlMetadataTests.java | 4 +- .../action/TransportCloseJobActionTests.java | 2 +- .../action/TransportOpenJobActionTests.java | 241 +++++------------- .../TransportStopDatafeedActionTests.java | 4 +- .../integration/BasicDistributedJobsIT.java | 2 +- .../ml/integration/JobConfigProviderIT.java | 19 +- 21 files changed, 375 insertions(+), 365 deletions(-) rename x-pack/plugin/{ml/src/test/java/org/elasticsearch/xpack => core/src/test/java/org/elasticsearch/xpack/core}/ml/MlTasksTests.java (71%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index a3707ba136388..80a0bbf3baab6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; @@ -331,9 +332,9 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new), new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new), // ML - Persistent action requests - new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME, + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME, StartDatafeedAction.DatafeedParams::new), - new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, OpenJobAction.JobParams::new), // ML - Task states new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new), @@ -379,9 +380,9 @@ public List getNamedXContent() { new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"), parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()), // ML - Persistent action requests - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME), StartDatafeedAction.DatafeedParams::fromXContent), - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME), OpenJobAction.JobParams::fromXContent), // ML - Task states new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index f421ba7bf4ad8..a56d3d639239d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -12,14 +12,17 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; public final class MlTasks { - public static final String JOB_TASK_PREFIX = "job-"; - public static final String DATAFEED_TASK_PREFIX = "datafeed-"; + public static final String JOB_TASK_NAME = "xpack/ml/job"; + public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed"; + + private static final String JOB_TASK_ID_PREFIX = "job-"; + private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-"; private MlTasks() { } @@ -29,7 +32,7 @@ private MlTasks() { * A datafeed id can be used as a job id, because they are stored separately in cluster state. */ public static String jobTaskId(String jobId) { - return JOB_TASK_PREFIX + jobId; + return JOB_TASK_ID_PREFIX + jobId; } /** @@ -37,7 +40,7 @@ public static String jobTaskId(String jobId) { * A job id can be used as a datafeed id, because they are stored separately in cluster state. */ public static String datafeedTaskId(String datafeedId) { - return DATAFEED_TASK_PREFIX + datafeedId; + return DATAFEED_TASK_ID_PREFIX + datafeedId; } @Nullable @@ -76,15 +79,41 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis } /** - * The job Ids of anomaly detector job tasks - * @param tasks Active tasks + * The job Ids of anomaly detector job tasks. + * All anomaly detector jobs are returned regardless of the status of the + * task (OPEN, CLOSED, FAILED etc). + * + * @param tasks Persistent tasks * @return The job Ids of anomaly detector job tasks */ public static Set openJobIds(PersistentTasksCustomMetaData tasks) { - Collection> activeTasks = tasks.tasks(); - - return activeTasks.stream().filter(t -> t.getId().startsWith(JOB_TASK_PREFIX)) - .map(t -> t.getId().substring(JOB_TASK_PREFIX.length())) + return tasks.findTasks(JOB_TASK_NAME, task -> true) + .stream() + .map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length())) .collect(Collectors.toSet()); } + + /** + * Is there an ml anomaly detector job task for the job {@code jobId}? + * @param jobId The job id + * @param tasks Persistent tasks + * @return + */ + public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { + return openJobIds(tasks).contains(jobId); + } + + /** + * Read the active anomaly detector job tasks. + * Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}. + * + * @param tasks Persistent tasks + * @return The job tasks excluding closed and failed jobs + */ + public static List> activeJobTasks(PersistentTasksCustomMetaData tasks) { + return tasks.findTasks(JOB_TASK_NAME, task -> true) + .stream() + .filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false) + .collect(Collectors.toList()); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java index e2cd7e43d0b4c..db1c0ad697956 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -25,6 +26,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -35,7 +37,7 @@ public class OpenJobAction extends Action PARSER = new ObjectParser<>(TASK_NAME, JobParams::new); + public static ObjectParser PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, JobParams::new); static { PARSER.declareString(JobParams::setJobId, Job.ID); PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME); @@ -163,6 +164,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) { // A big state can take a while to restore. For symmetry with the _close endpoint any // changes here should be reflected there too. private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT; + private Job job; JobParams() { } @@ -178,6 +180,9 @@ public JobParams(StreamInput in) throws IOException { in.readBoolean(); } timeout = TimeValue.timeValueMillis(in.readVLong()); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + job = in.readOptionalWriteable(Job::new); + } } public String getJobId() { @@ -196,9 +201,18 @@ public void setTimeout(TimeValue timeout) { this.timeout = timeout; } + @Nullable + public Job getJob() { + return job; + } + + public void setJob(Job job) { + this.job = job; + } + @Override public String getWriteableName() { - return TASK_NAME; + return MlTasks.JOB_TASK_NAME; } @Override @@ -209,6 +223,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); } out.writeVLong(timeout.millis()); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(job); + } } @Override @@ -217,12 +234,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Job.ID.getPreferredName(), jobId); builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep()); builder.endObject(); + // The job field is streamed but not persisted return builder; } @Override public int hashCode() { - return Objects.hash(jobId, timeout); + return Objects.hash(jobId, timeout, job); } @Override @@ -235,7 +253,8 @@ public boolean equals(Object obj) { } OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj; return Objects.equals(jobId, other.jobId) && - Objects.equals(timeout, other.timeout); + Objects.equals(timeout, other.timeout) && + Objects.equals(job, other.job); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index ec8e9173e47d3..e583ab3596c6b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -43,7 +44,6 @@ public class StartDatafeedAction public static final StartDatafeedAction INSTANCE = new StartDatafeedAction(); public static final String NAME = "cluster:admin/xpack/ml/datafeed/start"; - public static final String TASK_NAME = "xpack/ml/datafeed"; private StartDatafeedAction() { super(NAME); @@ -147,7 +147,7 @@ public boolean equals(Object obj) { public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams { - public static ObjectParser PARSER = new ObjectParser<>(TASK_NAME, DatafeedParams::new); + public static ObjectParser PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, DatafeedParams::new); static { PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID); @@ -235,7 +235,7 @@ public void setTimeout(TimeValue timeout) { @Override public String getWriteableName() { - return TASK_NAME; + return MlTasks.DATAFEED_TASK_NAME; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java index d894f7b339fe5..ef4ef6432c058 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.MlTasks; import java.io.IOException; import java.util.Locale; @@ -24,7 +24,7 @@ public enum DatafeedState implements PersistentTaskState { STARTED, STOPPED, STARTING, STOPPING; - public static final String NAME = StartDatafeedAction.TASK_NAME; + public static final String NAME = MlTasks.DATAFEED_TASK_NAME; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0])); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index d9ab3357319c6..2e6cc4b99c4bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.MlTasks; import java.io.IOException; import java.util.Objects; @@ -23,7 +23,7 @@ public class JobTaskState implements PersistentTaskState { - public static final String NAME = OpenJobAction.TASK_NAME; + public static final String NAME = MlTasks.JOB_TASK_NAME; private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index e77bb0b94919f..9797082fe1e01 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -29,6 +29,7 @@ public class JobUpdate implements Writeable, ToXContentObject { public static final ParseField DETECTORS = new ParseField("detectors"); + public static final ParseField CLEAR_JOB_FINISH_TIME = new ParseField("clear_job_finish_time"); // For internal updates static final ConstructingObjectParser INTERNAL_PARSER = new ConstructingObjectParser<>( @@ -58,6 +59,7 @@ public class JobUpdate implements Writeable, ToXContentObject { INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); + INTERNAL_PARSER.declareBoolean(Builder::setClearJobFinishTime, CLEAR_JOB_FINISH_TIME); } private final String jobId; @@ -75,6 +77,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final String modelSnapshotId; private final Long establishedModelMemory; private final Version jobVersion; + private final Boolean clearJobFinishTime; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -82,7 +85,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Long establishedModelMemory, @Nullable Version jobVersion) { + @Nullable Long establishedModelMemory, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -98,6 +101,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.modelSnapshotId = modelSnapshotId; this.establishedModelMemory = establishedModelMemory; this.jobVersion = jobVersion; + this.clearJobFinishTime = clearJobFinishTime; } public JobUpdate(StreamInput in) throws IOException { @@ -137,6 +141,11 @@ public JobUpdate(StreamInput in) throws IOException { } else { jobVersion = null; } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + clearJobFinishTime = in.readOptionalBoolean(); + } else { + clearJobFinishTime = null; + } } @Override @@ -174,6 +183,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalBoolean(clearJobFinishTime); + } } public String getJobId() { @@ -236,6 +248,10 @@ public Version getJobVersion() { return jobVersion; } + public Boolean getClearJobFinishTime() { + return clearJobFinishTime; + } + public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || detectorUpdates != null || groups != null; } @@ -286,6 +302,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (jobVersion != null) { builder.field(Job.JOB_VERSION.getPreferredName(), jobVersion); } + if (clearJobFinishTime != null) { + builder.field(CLEAR_JOB_FINISH_TIME.getPreferredName(), clearJobFinishTime); + } builder.endObject(); return builder; } @@ -415,6 +434,10 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { builder.setJobVersion(jobVersion); } + if (clearJobFinishTime != null && clearJobFinishTime) { + builder.setFinishedTime(null); + } + builder.setAnalysisConfig(newAnalysisConfig); return builder.build(); } @@ -434,7 +457,8 @@ && updatesDetectors(job) == false && (customSettings == null || Objects.equals(customSettings, job.getCustomSettings())) && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) - && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())); + && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) + && (clearJobFinishTime == false || job.getFinishedTime() == null); } boolean updatesDetectors(Job job) { @@ -481,14 +505,15 @@ public boolean equals(Object other) { && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) - && Objects.equals(this.jobVersion, that.jobVersion); + && Objects.equals(this.jobVersion, that.jobVersion) + && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion); + modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -599,6 +624,7 @@ public static class Builder { private String modelSnapshotId; private Long establishedModelMemory; private Version jobVersion; + private Boolean clearJobFinishTime; public Builder(String jobId) { this.jobId = jobId; @@ -684,10 +710,15 @@ public Builder setJobVersion(String version) { return this; } + public Builder setClearJobFinishTime(boolean clearJobFinishTime) { + this.clearJobFinishTime = clearJobFinishTime; + return this; + } + public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion); + modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java similarity index 71% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlTasksTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index 53bdfbdcb3b69..e2db7c3a30951 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -4,11 +4,10 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml; +package org.elasticsearch.xpack.core.ml; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -24,7 +23,7 @@ public void testGetJobState() { // A missing task is a closed job assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build())); // A task with no status is opening - tasksBuilder.addTask(MlTasks.jobTaskId("foo"), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams("foo"), + tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertEquals(JobState.OPENING, MlTasks.getJobState("foo", tasksBuilder.build())); @@ -37,7 +36,7 @@ public void testGetDatefeedState() { // A missing task is a stopped datafeed assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); - tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, + tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); @@ -50,7 +49,7 @@ public void testGetJobTask() { assertNull(MlTasks.getJobTask("foo", null)); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId("foo"), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams("foo"), + tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertNotNull(MlTasks.getJobTask("foo", tasksBuilder.build())); @@ -61,7 +60,7 @@ public void testGetDatafeedTask() { assertNull(MlTasks.getDatafeedTask("foo", null)); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, + tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); @@ -73,14 +72,27 @@ public void testOpenJobIds() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); - tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams("foo-1"), + tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.jobTaskId("bar"), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams("bar"), + tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.datafeedTaskId("df"), StartDatafeedAction.TASK_NAME, + tasksBuilder.addTask(MlTasks.datafeedTaskId("df"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("df", 0L), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); assertThat(MlTasks.openJobIds(tasksBuilder.build()), containsInAnyOrder("foo-1", "bar")); } + + public void testTaskExistsForJob() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); + + tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); + assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build())); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 37bec1196e756..92894de405eb4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -93,6 +93,9 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (useInternalParser && randomBoolean()) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } + if (useInternalParser) { + update.setClearJobFinishTime(randomBoolean()); + } return update.build(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 1fd0eddf41ced..6a621ffb076f0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.XPackClientPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; @@ -445,9 +446,9 @@ protected void ensureClusterStateConsistency() throws IOException { List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME, + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME, StartDatafeedAction.DatafeedParams::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, OpenJobAction.JobParams::new)); entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cd91d6282aac1..e57fb64558083 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -166,6 +166,7 @@ import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -369,6 +370,7 @@ public Collection createComponents(Client client, ClusterService cluster Auditor auditor = new Auditor(client, clusterService.nodeName()); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier); @@ -420,6 +422,7 @@ public Collection createComponents(Client client, ClusterService cluster return Arrays.asList( mlLifeCycleService, jobResultsProvider, + jobConfigProvider, jobManager, autodetectProcessManager, new MlInitializationService(settings, threadPool, clusterService, client), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 37d714d1777da..5df11f02a3610 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -79,7 +80,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (Objects.equals(currentAssignment, previousAssignment)) { continue; } - if (OpenJobAction.TASK_NAME.equals(currentTask.getTaskName())) { + if (MlTasks.JOB_TASK_NAME.equals(currentTask.getTaskName())) { String jobId = ((OpenJobAction.JobParams) currentTask.getParams()).getJobId(); if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); @@ -87,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) { DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } - } else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) { + } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); if (currentAssignment.getExecutorNode() == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 814892ec3c36d..02ecb511fed60 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -19,18 +19,17 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -53,7 +52,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -68,6 +66,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -100,19 +99,21 @@ public class TransportOpenJobAction extends TransportMasterNodeActioncheck job's version is supported * */ - static void validate(String jobId, MlMetadata mlMetadata) { - Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId); + static void validate(String jobId, Job job) { if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } @@ -137,12 +137,14 @@ static void validate(String jobId, MlMetadata mlMetadata) { } } - static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, + static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, @Nullable Job job, + ClusterState clusterState, int maxConcurrentJobAllocations, int fallbackMaxNumberOfOpenJobs, int maxMachineMemoryPercent, Logger logger) { - List unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState); + String resultsIndexName = job != null ? job.getResultsIndexName() : null; + List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]"; @@ -152,12 +154,8 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j List reasons = new LinkedList<>(); long maxAvailableCount = Long.MIN_VALUE; - long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByCount = null; - DiscoveryNode minLoadedNodeByMemory = null; - // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe - // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs - boolean allocateByMemory = true; + PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); @@ -170,40 +168,41 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j continue; } - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Job job = mlMetadata.getJobs().get(jobId); - Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); - if (compatibleJobTypes.contains(job.getJobType()) == false) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + - "], because this node does not support jobs of type [" + job.getJobType() + "]"; - logger.trace(reason); - reasons.add(reason); - continue; - } - - if (nodeSupportsJobVersion(node.getVersion()) == false) { + if (nodeSupportsMlJobs(node.getVersion()) == false) { String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) - + "], because this node does not support jobs of version [" + job.getJobVersion() + "]"; + + "], because this node does not support machine learning jobs"; logger.trace(reason); reasons.add(reason); continue; } - if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + - "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; - logger.trace(reason); - reasons.add(reason); - continue; + if (job != null) { + Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); + if (compatibleJobTypes.contains(job.getJobType()) == false) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + + "], because this node does not support jobs of type [" + job.getJobType() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + + if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + + "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; + logger.trace(reason); + reasons.add(reason); + continue; + } } + long numberOfAssignedJobs = 0; int numberOfAllocatingJobs = 0; - long assignedJobMemory = 0; + if (persistentTasks != null) { // find all the job tasks assigned to this node Collection> assignedTasks = persistentTasks.findTasks( - OpenJobAction.TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); + MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; @@ -214,6 +213,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } else { jobState = jobTaskState.getState(); if (jobTaskState.isStatusStale(assignedTask)) { + // the job is re-locating if (jobState == JobState.CLOSING) { // previous executor node failed while the job was closing - it won't // be reopened, so consider it CLOSED for resource usage purposes @@ -226,13 +226,9 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } } } - // Don't count CLOSED or FAILED jobs, as they don't consume native memory if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; - String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); - Job assignedJob = mlMetadata.getJobs().get(assignedJobId); - assert assignedJob != null; - assignedJobMemory += assignedJob.estimateMemoryFootprint(); } } } @@ -273,54 +269,10 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j maxAvailableCount = availableCount; minLoadedNodeByCount = node; } - - String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); - long machineMemory = -1; - // TODO: remove leniency and reject the node if the attribute is null in 7.0 - if (machineMemoryStr != null) { - try { - machineMemory = Long.parseLong(machineMemoryStr); - } catch (NumberFormatException e) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; - logger.trace(reason); - reasons.add(reason); - continue; - } - } - - if (allocateByMemory) { - if (machineMemory > 0) { - long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; - long estimatedMemoryFootprint = job.estimateMemoryFootprint(); - long availableMemory = maxMlMemory - assignedJobMemory; - if (estimatedMemoryFootprint > availableMemory) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + - "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory + - "], memory required by existing jobs [" + assignedJobMemory + - "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; - logger.trace(reason); - reasons.add(reason); - continue; - } - - if (maxAvailableMemory < availableMemory) { - maxAvailableMemory = availableMemory; - minLoadedNodeByMemory = node; - } - } else { - // If we cannot get the available memory on any machine in - // the cluster, fall back to simply allocating by job count - allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", - jobId, nodeNameAndMlAttributes(node)); - } - } } - DiscoveryNode minLoadedNode = allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount; - if (minLoadedNode != null) { - logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); - return new PersistentTasksCustomMetaData.Assignment(minLoadedNode.getId(), ""); + if (minLoadedNodeByCount != null) { + logger.debug("selected node [{}] for job [{}]", minLoadedNodeByCount, jobId); + return new PersistentTasksCustomMetaData.Assignment(minLoadedNodeByCount.getId(), ""); } else { String explanation = String.join("|", reasons); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); @@ -355,13 +307,15 @@ static String nodeNameAndMlAttributes(DiscoveryNode node) { return builder.toString(); } - static String[] indicesOfInterest(ClusterState clusterState, String job) { - String jobResultIndex = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterState, job); - return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, MlMetaIndex.INDEX_NAME}; + static String[] indicesOfInterest(String resultsIndex) { + if (resultsIndex == null) { + return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), MlMetaIndex.INDEX_NAME}; + } + return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), resultsIndex, MlMetaIndex.INDEX_NAME}; } - static List verifyIndicesPrimaryShardsAreActive(String jobId, ClusterState clusterState) { - String[] indices = indicesOfInterest(clusterState, jobId); + static List verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) { + String[] indices = indicesOfInterest(resultsIndex); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { // Indices are created on demand from templates. @@ -377,7 +331,7 @@ static List verifyIndicesPrimaryShardsAreActive(String jobId, ClusterSta return unavailableIndices; } - private static boolean nodeSupportsJobVersion(Version nodeVersion) { + private static boolean nodeSupportsMlJobs(Version nodeVersion) { return nodeVersion.onOrAfter(Version.V_5_5_0); } @@ -459,7 +413,7 @@ protected void masterOperation(OpenJobAction.Request request, ClusterState state OpenJobAction.JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { - // Step 6. Clear job finished time once the job is started and respond + // Clear job finished time once the job is started and respond ActionListener clearJobFinishTime = ActionListener.wrap( response -> { if (response.isAcknowledged()) { @@ -471,7 +425,7 @@ protected void masterOperation(OpenJobAction.Request request, ClusterState state listener::onFailure ); - // Step 5. Wait for job to be started + // Wait for job to be started ActionListener> waitForJobToStart = new ActionListener>() { @Override @@ -489,18 +443,20 @@ public void onFailure(Exception e) { } }; - // Step 4. Start job task + // Start job task ActionListener jobUpateListener = ActionListener.wrap( - response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - OpenJobAction.TASK_NAME, jobParams, waitForJobToStart), + response -> { + persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), + MlTasks.JOB_TASK_NAME, jobParams, waitForJobToStart); + }, listener::onFailure ); - // Step 3. Update established model memory for pre-6.1 jobs that haven't had it set + // Update established model memory for pre-6.1 jobs that haven't had it set // and increase the model memory limit for 6.1 - 6.3 jobs ActionListener missingMappingsListener = ActionListener.wrap( response -> { - Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobParams.getJobId()); + Job job = jobParams.getJob(); if (job != null) { Version jobVersion = job.getJobVersion(); Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); @@ -547,7 +503,7 @@ public void onFailure(Exception e) { }, listener::onFailure ); - // Step 2. Try adding state doc mapping + // Try adding state doc mapping ActionListener resultsPutMappingHandler = ActionListener.wrap( response -> { addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, @@ -555,9 +511,21 @@ public void onFailure(Exception e) { }, listener::onFailure ); - // Step 1. Try adding results doc mapping - addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), ElasticsearchMappings::resultsMapping, - state, resultsPutMappingHandler); + // Get the job config + jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap( + builder -> { + try { + jobParams.setJob(builder.build()); + + // Try adding results doc mapping + addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), + ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + )); } else { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); } @@ -596,34 +564,18 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(null); - - mlMetadataBuilder.putJob(jobBuilder.build(), true); - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder.metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); - } - }); + JobUpdate update = new JobUpdate.Builder(jobId).setClearJobFinishTime(true).build(); + + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + job -> listener.onResponse(new AcknowledgedResponse(true)), + e -> { + logger.error("[" + jobId + "] Failed to clear finished_time", e); + // Not a critical error so continue + listener.onResponse(new AcknowledgedResponse(true)); + } + )); } + private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), @@ -703,7 +655,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, AutodetectProcessManager autodetectProcessManager) { - super(settings, OpenJobAction.TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); + super(settings, MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.autodetectProcessManager = autodetectProcessManager; this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); @@ -716,19 +668,19 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { - return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, - maxMachineMemoryPercent, logger); + return selectLeastLoadedMlNode(params.getJobId(), params.getJob(), clusterState, + maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); } @Override public void validate(OpenJobAction.JobParams params, ClusterState clusterState) { - TransportOpenJobAction.validate(params.getJobId(), MlMetadata.getMlMetadata(clusterState)); + TransportOpenJobAction.validate(params.getJobId(), params.getJob()); // If we already know that we can't find an ml node because all ml nodes are running at capacity or // simply because there are no ml nodes in the cluster then we fail quickly here: - PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, - maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), params.getJob(), + clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); if (assignment.getExecutorNode() == null) { throw makeNoSuitableNodesException(logger, params.getJobId(), assignment.getExplanation()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f3f4a771443ef..578f4ee5f983b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -173,7 +173,7 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( dataExtractorFactory -> persistentTasksService.sendStartRequest(MlTasks.datafeedTaskId(params.getDatafeedId()), - StartDatafeedAction.TASK_NAME, params, listener) + MlTasks.DATAFEED_TASK_NAME, params, listener) , listener::onFailure)); } @@ -272,7 +272,7 @@ public static class StartDatafeedPersistentTasksExecutor extends PersistentTasks private final IndexNameExpressionResolver resolver; public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager datafeedManager) { - super(settings, StartDatafeedAction.TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); + super(settings, MlTasks.DATAFEED_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.datafeedManager = datafeedManager; this.resolver = new IndexNameExpressionResolver(settings); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index ef593417a19e3..b30338cd52c38 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -16,6 +16,9 @@ import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -145,6 +148,51 @@ public void onFailure(Exception e) { }, client::get); } + /** + * Get the list anomaly detector jobs specified by {@code jobIds}. + * + * WARNING: errors are silently ignored, if a job is not found a + * {@code ResourceNotFoundException} is not thrown. Only found + * jobs are returned, this size of the returned jobs list could + * be different to the size of the requested ids list. + * + * @param jobIds The jobs to get + * @param listener Jobs listener + */ + public void getJobs(List jobIds, ActionListener> listener) { + MultiGetRequest multiGetRequest = new MultiGetRequest(); + jobIds.forEach(jobId -> multiGetRequest.add(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId))); + + List jobs = new ArrayList<>(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, multiGetRequest, new ActionListener() { + @Override + public void onResponse(MultiGetResponse multiGetResponse) { + + MultiGetItemResponse[] responses = multiGetResponse.getResponses(); + for (MultiGetItemResponse response : responses) { + GetResponse getResponse = response.getResponse(); + if (getResponse.isExists()) { + BytesReference source = getResponse.getSourceAsBytesRef(); + try { + Job.Builder job = parseJobLenientlyFromSource(source); + jobs.add(job); + } catch (IOException e) { + logger.error("Error parsing job configuration [" + response.getId() + "]"); + } + } + } + + listener.onResponse(jobs); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, client::multiGet); + } + /** * Delete the anomaly detector job config document * diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index e16ac2f99700d..5e77afefc378c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -303,7 +303,7 @@ public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L); - tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), StartDatafeedAction.TASK_NAME, params, INITIAL_ASSIGNMENT); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); @@ -385,7 +385,7 @@ public void testRemoveDatafeed_failBecauseDatafeedStarted() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed1", 0L); - tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), StartDatafeedAction.TASK_NAME, params, INITIAL_ASSIGNMENT); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed1"), MlTasks.DATAFEED_TASK_NAME, params, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index c11d1b3779652..e638f3b220c21 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -312,7 +312,7 @@ public void testBuildWaitForCloseRequest() { public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state, PersistentTasksCustomMetaData.Builder tasks) { - tasks.addTask(MlTasks.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, + tasks.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment")); tasks.updateTaskState(MlTasks.datafeedTaskId(datafeedId), state); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 0badd34ad62c4..9afdace941022 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -63,7 +62,6 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import java.util.function.Function; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.containsString; @@ -74,36 +72,28 @@ public class TransportOpenJobActionTests extends ESTestCase { public void testValidate_jobMissing() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(buildJobBuilder("job_id1").build(), false); - expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", mlBuilder.build())); + expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", null)); } public void testValidate_jobMarkedAsDeleted() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); Job.Builder jobBuilder = buildJobBuilder("job_id"); jobBuilder.setDeleted(true); - mlBuilder.putJob(jobBuilder.build(), false); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> TransportOpenJobAction.validate("job_id", mlBuilder.build())); + () -> TransportOpenJobAction.validate("job_id", jobBuilder.build())); assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); } public void testValidate_jobWithoutVersion() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); Job.Builder jobBuilder = buildJobBuilder("job_id"); - mlBuilder.putJob(jobBuilder.build(), false); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> TransportOpenJobAction.validate("job_id", mlBuilder.build())); + () -> TransportOpenJobAction.validate("job_id", jobBuilder.build())); assertEquals("Cannot open job [job_id] because jobs created prior to version 5.5 are not supported", e.getMessage()); assertEquals(RestStatus.BAD_REQUEST, e.status()); } public void testValidate_givenValidJob() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); Job.Builder jobBuilder = buildJobBuilder("job_id"); - mlBuilder.putJob(jobBuilder.build(new Date()), false); - TransportOpenJobAction.validate("job_id", mlBuilder.build()); + TransportOpenJobAction.validate("job_id", jobBuilder.build(new Date())); } public void testSelectLeastLoadedMlNode_byCount() { @@ -126,94 +116,21 @@ public void testSelectLeastLoadedMlNode_byCount() { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); - MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4"); cs.nodes(nodes); - metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); - cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger); - assertEquals("", result.getExplanation()); - assertEquals("_node_id3", result.getExecutorNode()); - } - - public void testSelectLeastLoadedMlNode_byMemory() { - Map nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); - nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "16000000000"); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id1", "_node_id1", JobState.fromString("opened"), tasksBuilder); - addJobTask("job_id2", "_node_id2", JobState.fromString("opened"), tasksBuilder); - addJobTask("job_id3", "_node_id2", JobState.fromString("opened"), tasksBuilder); - addJobTask("job_id4", "_node_id3", JobState.fromString("opened"), tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, jobId -> { - // remember we add 100MB for the process overhead, so these model memory - // limits correspond to estimated footprints of 102MB and 205MB - long jobSize = (jobId.equals("job_id2") || jobId.equals("job_id3")) ? 2 : 105; - return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(jobSize, ByteSizeUnit.MB)).build(new Date()); - }, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5"); - cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id5", cs.build(), 2, 10, 30, logger); - assertEquals("", result.getExplanation()); - assertEquals("_node_id2", result.getExecutorNode()); - } - public void testSelectLeastLoadedMlNode_byMemoryWithFailedJobs() { - Map nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); - // this leaves just under 300MB per node available for ML jobs - nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000"); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), - nodeAttr, Collections.emptySet(), Version.CURRENT)) - .build(); + Job.Builder jobBuilder = buildJobBuilder("job_id4"); + jobBuilder.setJobVersion(Version.CURRENT); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder); - addJobTask("job_id2", "_node_id2", JobState.fromString("failed"), tasksBuilder); - addJobTask("job_id3", "_node_id3", JobState.fromString("failed"), tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); - MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, jobId -> { - // remember we add 100MB for the process overhead, so this model - // memory limit corresponds to a job size of 250MB - return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); - }, "job_id1", "job_id2", "job_id3", "job_id4"); - cs.nodes(nodes); - metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); - cs.metaData(metaData); - cs.routingTable(routingTable.build()); - // if the memory of the failed jobs is wrongly included in the calculation then this job will not be allocated - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), + cs.build(), 2, 10, 30, logger); assertEquals("", result.getExplanation()); - assertNotNull(result.getExecutorNode()); + assertEquals("_node_id3", result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_maxCapacity() { int numNodes = randomIntBetween(1, 10); int maxRunningJobsPerNode = randomIntBetween(1, 100); @@ -237,13 +154,14 @@ public void testSelectLeastLoadedMlNode_maxCapacity() { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, jobIds); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", cs.build(), 2, maxRunningJobsPerNode, 30, logger); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); + + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, + maxRunningJobsPerNode, 30, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -263,13 +181,13 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id1", "job_id2"); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, 30, logger); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); + + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -297,14 +215,13 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.nodes(nodes); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7"); - csBuilder.routingTable(routingTable.build()); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); csBuilder.metaData(metaData); + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); + ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -314,7 +231,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -325,7 +242,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -336,7 +253,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -367,15 +284,14 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.nodes(nodes); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8"); - csBuilder.routingTable(routingTable.build()); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); + // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, logger); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -385,7 +301,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -406,21 +322,17 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - Function incompatibleJobCreator = jobId -> { - Job job = mock(Job.class); - when(job.getId()).thenReturn(jobId); - when(job.getJobVersion()).thenReturn(Version.CURRENT); - when(job.getJobType()).thenReturn("incompatible_type"); - when(job.getResultsIndexName()).thenReturn("shared"); - return job; - }; - addJobAndIndices(metaData, routingTable, incompatibleJobCreator, "incompatible_type_job"); + + Job job = mock(Job.class); + when(job.getId()).thenReturn("incompatible_type_job"); + when(job.getJobVersion()).thenReturn(Version.CURRENT); + when(job.getJobType()).thenReturn("incompatible_type"); + when(job.getResultsIndexName()).thenReturn("shared"); + cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -441,14 +353,14 @@ public void testSelectLeastLoadedMlNode_noNodesPriorTo_V_5_5() { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "incompatible_type_job"); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", cs.build(), 2, 10, 30, logger); - assertThat(result.getExplanation(), containsString("because this node does not support jobs of version [" + Version.CURRENT + "]")); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); + + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30, logger); + assertThat(result.getExplanation(), containsString("because this node does not support machine learning jobs")); assertNull(result.getExecutorNode()); } @@ -468,14 +380,12 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, jobWithRulesCreator(), "job_with_rules"); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", cs.build(), - 2, 10, 30, logger); + + Job job = jobWithRules("job_with_rules"); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); assertNull(result.getExecutorNode()); @@ -497,33 +407,31 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, jobWithRulesCreator(), "job_with_rules"); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - cs.routingTable(routingTable.build()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", cs.build(), - 2, 10, 30, logger); + + Job job = jobWithRules("job_with_rules"); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, logger); assertNotNull(result.getExecutorNode()); } public void testVerifyIndicesPrimaryShardsAreActive() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); - addJobAndIndices(metaData, routingTable, "job_id"); + addIndices(metaData, routingTable); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.routingTable(routingTable.build()); csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); - assertEquals(0, TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive("job_id", cs).size()); + assertEquals(0, TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", cs).size()); metaData = new MetaData.Builder(cs.metaData()); routingTable = new RoutingTable.Builder(cs.routingTable()); - String indexToRemove = randomFrom(TransportOpenJobAction.indicesOfInterest(cs, "job_id")); + String indexToRemove = randomFrom(TransportOpenJobAction.indicesOfInterest(".ml-anomalies-shared")); if (randomBoolean()) { routingTable.remove(indexToRemove); } else { @@ -538,7 +446,7 @@ public void testVerifyIndicesPrimaryShardsAreActive() { csBuilder.routingTable(routingTable.build()); csBuilder.metaData(metaData); - List result = TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive("job_id", csBuilder.build()); + List result = TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", csBuilder.build()); assertEquals(1, result.size()); assertEquals(indexToRemove, result.get(0)); } @@ -663,20 +571,14 @@ public void testJobTaskMatcherMatch() { } public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { - builder.addTask(MlTasks.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId), + builder.addTask(MlTasks.jobTaskId(jobId), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), new Assignment(nodeId, "test assignment")); if (jobState != null) { builder.updateTaskState(MlTasks.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId())); } } - private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { - addJobAndIndices(metaData, routingTable, jobId -> - BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()), jobIds); - } - - private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, Function jobCreator, - String... jobIds) { + private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) { List indices = new ArrayList<>(); indices.add(AnomalyDetectorsIndex.jobStateIndexName()); indices.add(MlMetaIndex.INDEX_NAME); @@ -699,13 +601,6 @@ private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder ro routingTable.add(IndexRoutingTable.builder(index) .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); } - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - for (String jobId : jobIds) { - Job job = jobCreator.apply(jobId); - mlMetadata.putJob(job, false); - } - metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()); } private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { @@ -744,21 +639,19 @@ private ClusterState getClusterStateWithMappingsWithMetaData(Map return csBuilder.build(); } - private static Function jobWithRulesCreator() { - return jobId -> { - DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( - new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) - )).build(); - - Detector.Builder detector = new Detector.Builder("count", null); - detector.setRules(Collections.singletonList(rule)); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - Job.Builder job = new Job.Builder(jobId); - job.setAnalysisConfig(analysisConfig); - job.setDataDescription(dataDescription); - return job.build(new Date()); - }; + private static Job jobWithRules(String jobId) { + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( + new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) + )).build(); + + Detector.Builder detector = new Detector.Builder("count", null); + detector.setRules(Collections.singletonList(rule)); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + return job.build(new Date()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index a15c0e97b97f1..d8b1d28153688 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -29,7 +29,7 @@ public class TransportStopDatafeedActionTests extends ESTestCase { public void testValidate() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, + tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED); tasksBuilder.build(); @@ -118,7 +118,7 @@ public void testResolveDataFeedIds_GivenAll() { public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state, PersistentTasksCustomMetaData.Builder taskBuilder) { - taskBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, + taskBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); taskBuilder.updateTaskState(MlTasks.datafeedTaskId(datafeedId), state); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 5a51600ebaad5..96c887458d722 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -262,7 +262,7 @@ public void testMaxConcurrentJobAllocations() throws Exception { } for (DiscoveryNode node : event.state().nodes()) { - Collection> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> { + Collection> foundTasks = tasks.findTasks(MlTasks.JOB_TASK_NAME, task -> { JobTaskState jobTaskState = (JobTaskState) task.getState(); return node.getId().equals(task.getExecutorNode()) && (jobTaskState == null || jobTaskState.isStatusStale(task)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 3456290745a05..ba0f5520e0740 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -46,6 +46,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -130,7 +131,6 @@ public void testCrud() throws InterruptedException { AtomicReference getJobResponseHolder = new AtomicReference<>(); blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); - assertEquals(newJob, getJobResponseHolder.get().build()); // Update Job @@ -170,6 +170,23 @@ public void testCrud() throws InterruptedException { assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); } + public void testGetJobs() throws Exception { + putJob(createJob("nginx", null)); + putJob(createJob("tomcat", null)); + putJob(createJob("mysql", null)); + + List jobsToGet = Arrays.asList("nginx", "tomcat", "unknown-job"); + + AtomicReference> jobsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.getJobs(jobsToGet, actionListener), jobsHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get(), hasSize(2)); + List foundIds = jobsHolder.get().stream().map(Job.Builder::getId).collect(Collectors.toList()); + assertThat(foundIds, containsInAnyOrder("nginx", "tomcat")); + } + public void testUpdateWithAValidationError() throws Exception { final String jobId = "bad-update-job";