diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 76de682e99d35..ed9524e3143aa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -173,6 +173,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws private static void mapValuesToXContent(ParseField field, Map map, XContentBuilder builder, Params params) throws IOException { + if (map.isEmpty()) { + return; + } + builder.startArray(field.getPreferredName()); for (Map.Entry entry : map.entrySet()) { entry.getValue().toXContent(builder, params); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index b81a1f7d7b9c0..c166f64c4e3ef 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -6,14 +6,16 @@ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -133,6 +135,42 @@ public static Set openJobIds(@Nullable PersistentTasksCustomMetaData tas .collect(Collectors.toSet()); } + /** + * Get the job Ids of anomaly detector job tasks that do + * not have an assignment. + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return The job Ids of tasks to do not have an assignment. + */ + public static Set unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + return unallocatedJobTasks(tasks, nodes).stream() + .map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length())) + .collect(Collectors.toSet()); + } + + /** + * The job tasks that do not have an allocation as determined by + * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return Unallocated job tasks + */ + public static Collection unallocatedJobTasks( + @Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + if (tasks == null) { + return Collections.emptyList(); + } + + return tasks.findTasks(JOB_TASK_NAME, task -> true) + .stream() + .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) + .collect(Collectors.toList()); + } + /** * The datafeed Ids of started datafeed tasks * @@ -151,26 +189,39 @@ public static Set startedDatafeedIds(@Nullable PersistentTasksCustomMeta } /** - * Is there an ml anomaly detector job task for the job {@code jobId}? - * @param jobId The job id - * @param tasks Persistent tasks - * @return True if the job has a task + * Get the datafeed Ids of started datafeed tasks + * that do not have an assignment. + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return The job Ids of tasks to do not have an assignment. */ - public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { - return openJobIds(tasks).contains(jobId); + public static Set unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + + return unallocatedDatafeedTasks(tasks, nodes).stream() + .map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length())) + .collect(Collectors.toSet()); } /** - * Read the active anomaly detector job tasks. - * Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}. + * The datafeed tasks that do not have an allocation as determined by + * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} * - * @param tasks Persistent tasks - * @return The job tasks excluding closed and failed jobs + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return Unallocated datafeed tasks */ - public static List> activeJobTasks(PersistentTasksCustomMetaData tasks) { - return tasks.findTasks(JOB_TASK_NAME, task -> true) + public static Collection unallocatedDatafeedTasks( + @Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + if (tasks == null) { + return Collections.emptyList(); + } + + return tasks.findTasks(DATAFEED_TASK_NAME, task -> true) .stream() - .filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false) + .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) .collect(Collectors.toList()); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index 408520472c4f2..e80b47b057bf7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -6,6 +6,10 @@ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -14,12 +18,14 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import java.net.InetAddress; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; public class MlTasksTests extends ESTestCase { public void testGetJobState() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); // A missing task is a closed job assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build())); // A task with no status is opening @@ -52,7 +58,7 @@ public void testGetDatefeedState() { public void testGetJobTask() { assertNull(MlTasks.getJobTask("foo", null)); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); @@ -73,7 +79,7 @@ public void testGetDatafeedTask() { } public void testOpenJobIds() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), @@ -92,7 +98,7 @@ public void testOpenJobIds_GivenNull() { } public void testStartedDatafeedIds() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), @@ -111,16 +117,48 @@ public void testStartedDatafeedIds_GivenNull() { assertThat(MlTasks.startedDatafeedIds(null), empty()); } - public void testTaskExistsForJob() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); - - tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"), + public void testUnallocatedJobIds() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId("job_with_assignment"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_with_assignment"), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("job_without_assignment"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_without_assignment"), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("job_without_node"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_without_node"), + new PersistentTasksCustomMetaData.Assignment("dead-node", "expired node")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node-1") + .masterNodeId("node-1") + .build(); + + assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes), + containsInAnyOrder("job_without_assignment", "job_without_node")); + } - assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); - assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build())); + public void testUnallocatedDatafeedIds() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_with_assignment"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_with_assignment", 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_assignment"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_without_assignment", 0L), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_node"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_without_node", 0L), + new PersistentTasksCustomMetaData.Assignment("dead_node", "expired node")); + + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node-1") + .masterNodeId("node-1") + .build(); + + assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes), + containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node")); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 4dc3873c5859d..a2c9f6eeaaf30 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -58,7 +58,7 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( + mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap( response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)), e -> { logger.error("error migrating ml configurations", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 72cb52424c3b1..daa143ec01977 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -79,9 +79,10 @@ static boolean mlConfigIndexIsAllocated(ClusterState clusterState) { * False if {@link #canStartMigration(ClusterState)} returns {@code false} * False if the job is not in the cluster state * False if the {@link Job#isDeleting()} - * False if the job has a persistent task + * False if the job has an allocated persistent task * True otherwise i.e. the job is present, not deleting - * and does not have a persistent task. + * and does not have a persistent task or its persistent + * task is un-allocated * * @param jobId The job Id * @param clusterState The cluster state @@ -100,15 +101,17 @@ public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState } PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; + return MlTasks.openJobIds(persistentTasks).contains(jobId) == false || + MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId); } /** * Is the datafeed a eligible for migration? Returns: * False if {@link #canStartMigration(ClusterState)} returns {@code false} * False if the datafeed is not in the cluster state - * False if the datafeed has a persistent task - * True otherwise i.e. the datafeed is present and does not have a persistent task. + * False if the datafeed has an allocated persistent task + * True otherwise i.e. the datafeed is present and does not have a persistent + * task or its persistent task is un-allocated * * @param datafeedId The datafeed Id * @param clusterState The cluster state @@ -125,6 +128,7 @@ public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState cl } PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; + return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false + || MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index fbf8c3c804eef..bb03b1170ceca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -36,6 +37,8 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -67,25 +70,28 @@ /** * Migrates job and datafeed configurations from the clusterstate to - * index documents. + * index documents for closed or unallocated tasks. * * There are 3 steps to the migration process * 1. Read config from the clusterstate + * - Find all job and datafeed configs that do not have an associated persistent + * task or the persistent task is unallocated * - If a job or datafeed is added after this call it will be added to the index * - If deleted then it's possible the config will be copied before it is deleted. * Mitigate against this by filtering out jobs marked as deleting * 2. Copy the config to the index * - The index operation could fail, don't delete from clusterstate in this case - * 3. Remove config from the clusterstate + * 3. Remove config from the clusterstate and update persistent task parameters * - Before this happens config is duplicated in index and clusterstate, all ops - * must prefer to use the index config at this stage + * must prefer to use the clusterstate config at this stage * - If the clusterstate update fails then the config will remain duplicated * and the migration process should try again + * - Job and datafeed tasks opened prior to v6.6.0 need to be updated with new + * parameters * * If there was an error in step 3 and the config is in both the clusterstate and - * index then when the migrator retries it must not overwrite an existing job config - * document as once the index document is present all update operations will function - * on that rather than the clusterstate. + * index. At this point the clusterstate config is preferred and all update + * operations will function on that rather than the index. * * The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE} * pairs of datafeeds and jobs are migrated together. @@ -130,7 +136,7 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster * @param clusterState The current clusterstate * @param listener The success listener */ - public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { + public void migrateConfigs(ClusterState clusterState, ActionListener listener) { if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; @@ -183,8 +189,8 @@ private void migrateBatches(List batches, ActionListener writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap( failedDocumentIds -> { - List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); - List successfulDatafeedWrites = + List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); + List successfulDatafeedWrites = filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs); removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener); }, @@ -215,24 +221,33 @@ public void writeConfigToIndex(Collection datafeedsToMigrate, ); } - private void removeFromClusterState(List jobsToRemoveIds, List datafeedsToRemoveIds, + private void removeFromClusterState(List jobsToRemove, List datafeedsToRemove, ActionListener listener) { - if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) { + if (jobsToRemove.isEmpty() && datafeedsToRemove.isEmpty()) { listener.onResponse(null); return; } + Map jobsMap = jobsToRemove.stream().collect(Collectors.toMap(Job::getId, Function.identity())); + Map datafeedMap = + datafeedsToRemove.stream().collect(Collectors.toMap(DatafeedConfig::getId, Function.identity())); + AtomicReference removedConfigs = new AtomicReference<>(); clusterService.submitStateUpdateTask("remove-migrated-ml-configs", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - RemovalResult removed = removeJobsAndDatafeeds(jobsToRemoveIds, datafeedsToRemoveIds, + RemovalResult removed = removeJobsAndDatafeeds(jobsToRemove, datafeedsToRemove, MlMetadata.getMlMetadata(currentState)); removedConfigs.set(removed); + + PersistentTasksCustomMetaData updatedTasks = rewritePersistentTaskParams(jobsMap, datafeedMap, + currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE), currentState.nodes()); + ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(MlMetadata.TYPE, removed.mlMetadata) + .putCustom(PersistentTasksCustomMetaData.TYPE, updatedTasks) .build()); return newState.build(); } @@ -257,6 +272,82 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + /** + * Find any unallocated datafeed and job tasks and update their persistent + * task parameters if they have missing fields that were added in v6.6. If + * a task exists with a missing field it must have been created in an earlier + * version and survived an elasticsearch upgrade. + * + * If there are no unallocated tasks the {@code currentTasks} argument is returned. + * + * @param jobs Job configs + * @param datafeeds Datafeed configs + * @param currentTasks The persistent tasks + * @param nodes The nodes in the cluster + * @return The updated tasks + */ + public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map jobs, Map datafeeds, + PersistentTasksCustomMetaData currentTasks, + DiscoveryNodes nodes) { + + Collection unallocatedJobTasks = MlTasks.unallocatedJobTasks(currentTasks, nodes); + Collection unallocatedDatafeedsTasks = + MlTasks.unallocatedDatafeedTasks(currentTasks, nodes); + + if (unallocatedJobTasks.isEmpty() && unallocatedDatafeedsTasks.isEmpty()) { + return currentTasks; + } + + PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(currentTasks); + + for (PersistentTasksCustomMetaData.PersistentTask jobTask : unallocatedJobTasks) { + OpenJobAction.JobParams originalParams = (OpenJobAction.JobParams) jobTask.getParams(); + if (originalParams.getJob() == null) { + Job job = jobs.get(originalParams.getJobId()); + if (job != null) { + logger.debug("updating persistent task params for job [{}]", originalParams.getJobId()); + + // copy and update the job parameters + OpenJobAction.JobParams updatedParams = new OpenJobAction.JobParams(originalParams.getJobId()); + updatedParams.setTimeout(originalParams.getTimeout()); + updatedParams.setJob(job); + + // replace with the updated params + taskBuilder.removeTask(jobTask.getId()); + taskBuilder.addTask(jobTask.getId(), jobTask.getTaskName(), updatedParams, jobTask.getAssignment()); + } else { + logger.error("cannot find job for task [{}]", jobTask.getId()); + } + } + } + + for (PersistentTasksCustomMetaData.PersistentTask datafeedTask : unallocatedDatafeedsTasks) { + StartDatafeedAction.DatafeedParams originalParams = (StartDatafeedAction.DatafeedParams) datafeedTask.getParams(); + + if (originalParams.getJobId() == null) { + DatafeedConfig datafeedConfig = datafeeds.get(originalParams.getDatafeedId()); + if (datafeedConfig != null) { + logger.debug("Updating persistent task params for datafeed [{}]", originalParams.getDatafeedId()); + + StartDatafeedAction.DatafeedParams updatedParams = + new StartDatafeedAction.DatafeedParams(originalParams.getDatafeedId(), originalParams.getStartTime()); + updatedParams.setTimeout(originalParams.getTimeout()); + updatedParams.setEndTime(originalParams.getEndTime()); + updatedParams.setJobId(datafeedConfig.getJobId()); + updatedParams.setDatafeedIndices(datafeedConfig.getIndices()); + + // replace with the updated params + taskBuilder.removeTask(datafeedTask.getId()); + taskBuilder.addTask(datafeedTask.getId(), datafeedTask.getTaskName(), updatedParams, datafeedTask.getAssignment()); + } else { + logger.error("cannot find datafeed for task [{}]", datafeedTask.getId()); + } + } + } + + return taskBuilder.build(); + } + static class RemovalResult { MlMetadata mlMetadata; List removedJobIds; @@ -281,20 +372,20 @@ static class RemovalResult { * @return Structure tracking which jobs and datafeeds were actually removed * and the new MlMetadata */ - static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List datafeedsToRemove, MlMetadata mlMetadata) { + static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List datafeedsToRemove, MlMetadata mlMetadata) { Map currentJobs = new HashMap<>(mlMetadata.getJobs()); List removedJobIds = new ArrayList<>(); - for (String jobId : jobsToRemove) { - if (currentJobs.remove(jobId) != null) { - removedJobIds.add(jobId); + for (Job job : jobsToRemove) { + if (currentJobs.remove(job.getId()) != null) { + removedJobIds.add(job.getId()); } } Map currentDatafeeds = new HashMap<>(mlMetadata.getDatafeeds()); List removedDatafeedIds = new ArrayList<>(); - for (String datafeedId : datafeedsToRemove) { - if (currentDatafeeds.remove(datafeedId) != null) { - removedDatafeedIds.add(datafeedId); + for (DatafeedConfig datafeed : datafeedsToRemove) { + if (currentDatafeeds.remove(datafeed.getId()) != null) { + removedDatafeedIds.add(datafeed.getId()); } } @@ -441,15 +532,18 @@ public static List nonDeletingJobs(List jobs) { } /** - * Find the configurations for all closed jobs in the cluster state. - * Closed jobs are those that do not have an associated persistent task. + * Find the configurations for all closed jobs and the jobs that + * do not have an allocation in the cluster state. + * Closed jobs are those that do not have an associated persistent task, + * unallocated jobs have a task but no executing node * * @param clusterState The cluster state * @return The closed job configurations */ - public static List closedJobConfigs(ClusterState clusterState) { + public static List closedOrUnallocatedJobs(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set openJobIds = MlTasks.openJobIds(persistentTasks); + openJobIds.removeAll(MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getJobs().values().stream() @@ -458,15 +552,18 @@ public static List closedJobConfigs(ClusterState clusterState) { } /** - * Find the configurations for stopped datafeeds in the cluster state. - * Stopped datafeeds are those that do not have an associated persistent task. + * Find the configurations for stopped datafeeds and datafeeds that do + * not have an allocation in the cluster state. + * Stopped datafeeds are those that do not have an associated persistent task, + * unallocated datafeeds have a task but no executing node. * * @param clusterState The cluster state * @return The closed job configurations */ - public static List stoppedDatafeedConfigs(ClusterState clusterState) { + public static List stopppedOrUnallocatedDatafeeds(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks); + startedDatafeedIds.removeAll(MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getDatafeeds().values().stream() @@ -489,8 +586,8 @@ public int totalCount() { } public static List splitInBatches(ClusterState clusterState) { - Collection stoppedDatafeeds = stoppedDatafeedConfigs(clusterState); - Map eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream() + Collection stoppedDatafeeds = stopppedOrUnallocatedDatafeeds(clusterState); + Map eligibleJobs = nonDeletingJobs(closedOrUnallocatedJobs(clusterState)).stream() .map(MlConfigMigrator::updateJobForMigration) .collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a)); @@ -572,17 +669,15 @@ static Set documentsNotWritten(BulkResponse response) { return failedDocumentIds; } - static List filterFailedJobConfigWrites(Set failedDocumentIds, List jobs) { + static List filterFailedJobConfigWrites(Set failedDocumentIds, List jobs) { return jobs.stream() - .map(Job::getId) - .filter(id -> failedDocumentIds.contains(Job.documentId(id)) == false) + .filter(job -> failedDocumentIds.contains(Job.documentId(job.getId())) == false) .collect(Collectors.toList()); } - static List filterFailedDatafeedConfigWrites(Set failedDocumentIds, Collection datafeeds) { + static List filterFailedDatafeedConfigWrites(Set failedDocumentIds, Collection datafeeds) { return datafeeds.stream() - .map(DatafeedConfig::getId) - .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) + .filter(datafeed -> failedDocumentIds.contains(DatafeedConfig.documentId(datafeed.getId())) == false) .collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index b7b4fb3aad4c9..4b0d9ad63c6e5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -98,6 +97,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + @@ -199,23 +201,21 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j continue; } - if (job != null) { - Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); - if (compatibleJobTypes.contains(job.getJobType()) == false) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + - "], because this node does not support jobs of type [" + job.getJobType() + "]"; - logger.trace(reason); - reasons.add(reason); - continue; - } + Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); + if (compatibleJobTypes.contains(job.getJobType()) == false) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + + "], because this node does not support jobs of type [" + job.getJobType() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } - if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + - "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; - logger.trace(reason); - reasons.add(reason); - continue; - } + if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + + "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; + logger.trace(reason); + reasons.add(reason); + continue; } long numberOfAssignedJobs = 0; @@ -693,6 +693,13 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { + + // If the task parameters do not have a job field then the job + // was first opened on a pre v6.6 node and has not been migrated + if (params.getJob() == null) { + return AWAITING_MIGRATION; + } + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), params.getJob(), clusterState, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 5c8c253794794..3e31c8d564b62 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -63,7 +63,7 @@ private void setupMocks() { ActionListener listener = (ActionListener) invocation.getArguments()[1]; listener.onResponse(Boolean.TRUE); return null; - }).when(configMigrator).migrateConfigsWithoutTasks(any(ClusterState.class), any(ActionListener.class)); + }).when(configMigrator).migrateConfigs(any(ClusterState.class), any(ActionListener.class)); } public void testClusterChanged_info() { @@ -87,7 +87,7 @@ public void testClusterChanged_info() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).info(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); + verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -120,7 +120,7 @@ public void testClusterChanged_warning() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).warning(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); + verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -153,7 +153,7 @@ public void testClusterChanged_noPersistentTaskChanges() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + verify(configMigrator, times(1)).migrateConfigs(any(), any()); verifyNoMoreInteractions(auditor); // no longer master diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index 4785f9f75a5c3..4a70bcf02d3a5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -270,6 +270,34 @@ public void testJobIsEligibleForMigration_givenClosedJob() { assertTrue(check.jobIsEligibleForMigration(closedJob.getId(), clusterState)); } + public void testJobIsEligibleForMigration_givenOpenAndUnallocatedJob() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .routingTable(routingTable.build()) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.jobIsEligibleForMigration(openJob.getId(), clusterState)); + } + public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) @@ -371,6 +399,36 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); } + public void testDatafeedIsEligibleForMigration_givenUnallocatedDatafeed() { + Job job = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); + String datafeedId = "df-" + job.getId(); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedId, 0L), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .routingTable(routingTable.build()) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + private void givenClusterSettings(Settings settings) { ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList( MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index cff299b9fa1aa..62c29efdff968 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -11,7 +11,10 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -23,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTests; import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -37,6 +41,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -50,74 +55,82 @@ public void testNonDeletingJobs() { assertThat(MlConfigMigrator.nonDeletingJobs(Arrays.asList(job1, job2, deletingJob)), containsInAnyOrder(job1, job2)); } - public void testClosedJobConfigs() { - Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); - Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); + public void testClosedOrUnallocatedJobs() { + Job closedJob = JobTests.buildJobBuilder("closedjob").build(); + Job jobWithoutAllocation = JobTests.buildJobBuilder("jobwithoutallocation").build(); + Job openJob = JobTests.buildJobBuilder("openjob").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder() - .putJob(openJob1, false) - .putJob(openJob2, false) - .putDatafeed(createCompatibleDatafeed(openJob1.getId()), Collections.emptyMap()); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) - ) - .build(); - - assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob1, openJob2)); + .putJob(closedJob, false) + .putJob(jobWithoutAllocation, false) + .putJob(openJob, false) + .putDatafeed(createCompatibleDatafeed(closedJob.getId()), Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("jobwithoutallocation"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("jobwithoutallocation"), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("openjob"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("openjob"), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); - clusterState = ClusterState.builder(new ClusterName("migratortests")) + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) .metaData(MetaData.builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) ) + .nodes(nodes) .build(); - assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob2)); + assertThat(MlConfigMigrator.closedOrUnallocatedJobs(clusterState), containsInAnyOrder(closedJob, jobWithoutAllocation)); } public void testStoppedDatafeedConfigs() { - Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); - Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); - DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(openJob1.getId()); - DatafeedConfig datafeedConfig2 = createCompatibleDatafeed(openJob2.getId()); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder() - .putJob(openJob1, false) - .putJob(openJob2, false) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .putDatafeed(datafeedConfig2, Collections.emptyMap()); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) - ) - .build(); - - assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig1, datafeedConfig2)); + Job job1 = JobTests.buildJobBuilder("job1").build(); + Job job2 = JobTests.buildJobBuilder("job2").build(); + Job job3 = JobTests.buildJobBuilder("job3").build(); + DatafeedConfig stopppedDatafeed = createCompatibleDatafeed(job1.getId()); + DatafeedConfig datafeedWithoutAllocation = createCompatibleDatafeed(job2.getId()); + DatafeedConfig startedDatafeed = createCompatibleDatafeed(job3.getId()); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(job1, false) + .putJob(job2, false) + .putJob(job3, false) + .putDatafeed(stopppedDatafeed, Collections.emptyMap()) + .putDatafeed(datafeedWithoutAllocation, Collections.emptyMap()) + .putDatafeed(startedDatafeed, Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedConfig1.getId()), MlTasks.DATAFEED_TASK_NAME, - new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L), - new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(stopppedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(startedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); - clusterState = ClusterState.builder(new ClusterName("migratortests")) + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) .metaData(MetaData.builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) ) + .nodes(nodes) .build(); - assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig2)); + assertThat(MlConfigMigrator.stopppedOrUnallocatedDatafeeds(clusterState), + containsInAnyOrder(stopppedDatafeed, datafeedWithoutAllocation)); } public void testUpdateJobForMigration() { @@ -155,7 +168,7 @@ public void testFilterFailedJobConfigWrites() { assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.emptySet(), jobs), hasSize(3)); assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.singleton(Job.documentId("bar")), jobs), - contains("foo", "baz")); + contains(jobs.get(0), jobs.get(2))); } public void testFilterFailedDatafeedConfigWrites() { @@ -166,7 +179,7 @@ public void testFilterFailedDatafeedConfigWrites() { assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.emptySet(), datafeeds), hasSize(3)); assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.singleton(DatafeedConfig.documentId("df-foo")), datafeeds), - contains("df-bar", "df-baz")); + contains(datafeeds.get(1), datafeeds.get(2))); } public void testDocumentsNotWritten() { @@ -197,7 +210,7 @@ public void testRemoveJobsAndDatafeeds_removeAll() { .putDatafeed(datafeedConfig2, Collections.emptyMap()); MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( - Arrays.asList("job1", "job2"), Arrays.asList("df-job1", "df-job2"), mlMetadata.build()); + Arrays.asList(job1, job2), Arrays.asList(datafeedConfig1, datafeedConfig2), mlMetadata.build()); assertThat(removalResult.mlMetadata.getJobs().keySet(), empty()); assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), empty()); @@ -215,7 +228,8 @@ public void testRemoveJobsAndDatafeeds_removeSome() { .putDatafeed(datafeedConfig1, Collections.emptyMap()); MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( - Arrays.asList("job1", "job-none"), Collections.singletonList("df-none"), mlMetadata.build()); + Arrays.asList(job1, JobTests.buildJobBuilder("job-none").build()), + Collections.singletonList(createCompatibleDatafeed("job-none")), mlMetadata.build()); assertThat(removalResult.mlMetadata.getJobs().keySet(), contains("job2")); assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), contains("df-job1")); @@ -300,6 +314,115 @@ public void testLimitWrites_GivenNullJob() { assertThat(jobsAndDatafeeds.jobs, empty()); } + public void testRewritePersistentTaskParams() { + Map jobs = new HashMap<>(); + Job closedJob = JobTests.buildJobBuilder("closed-job").build(); + Job unallocatedJob = JobTests.buildJobBuilder("job-to-update").build(); + Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build(); + jobs.put(closedJob.getId(), closedJob); + jobs.put(unallocatedJob.getId(), unallocatedJob); + jobs.put(allocatedJob.getId(), allocatedJob); + + Map datafeeds = new HashMap<>(); + DatafeedConfig stoppedDatafeed = createCompatibleDatafeed(closedJob.getId()); + DatafeedConfig unallocatedDatafeed = createCompatibleDatafeed(unallocatedJob.getId()); + DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId()); + datafeeds.put(stoppedDatafeed.getId(), stoppedDatafeed); + datafeeds.put(unallocatedDatafeed.getId(), unallocatedDatafeed); + datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + // job tasks + tasksBuilder.addTask(MlTasks.jobTaskId(unallocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(unallocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(allocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + // datafeed tasks + tasksBuilder.addTask(MlTasks.datafeedTaskId(unallocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(unallocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + PersistentTasksCustomMetaData originalTasks = tasksBuilder.build(); + OpenJobAction.JobParams originalUnallocatedTaskParams = (OpenJobAction.JobParams) originalTasks.getTask( + MlTasks.jobTaskId(unallocatedJob.getId())).getParams(); + assertNull(originalUnallocatedTaskParams.getJob()); + StartDatafeedAction.DatafeedParams originalUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask( + MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams(); + assertNull(originalUnallocatedDatafeedParams.getJobId()); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes); + + // The unallocated task should be modifed + OpenJobAction.JobParams modifedUnallocatedTaskParams = + (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(unallocatedJob.getId())).getParams(); + assertNotEquals(originalUnallocatedTaskParams, modifedUnallocatedTaskParams); + assertEquals(unallocatedJob, modifedUnallocatedTaskParams.getJob()); + + // the allocated task should not be modified + OpenJobAction.JobParams allocatedJobParams = + (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams(); + assertEquals(null, allocatedJobParams.getJob()); + OpenJobAction.JobParams originalAllocatedJobParams = + (OpenJobAction.JobParams) originalTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams(); + assertEquals(originalAllocatedJobParams, allocatedJobParams); + + + // unallocated datafeed should be updated + StartDatafeedAction.DatafeedParams modifiedUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask( + MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams(); + assertNotEquals(originalUnallocatedDatafeedParams, modifiedUnallocatedDatafeedParams); + assertEquals(unallocatedDatafeed.getJobId(), modifiedUnallocatedDatafeedParams.getJobId()); + assertEquals(unallocatedDatafeed.getIndices(), modifiedUnallocatedDatafeedParams.getDatafeedIndices()); + + // allocated datafeed will not be updated + StartDatafeedAction.DatafeedParams allocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask( + MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams(); + assertNull(allocatedDatafeedParams.getJobId()); + assertThat(allocatedDatafeedParams.getDatafeedIndices(), empty()); + StartDatafeedAction.DatafeedParams originalAllocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask( + MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams(); + assertEquals(originalAllocatedDatafeedParams, allocatedDatafeedParams); + } + + public void testRewritePersistentTaskParams_GivenNoUnallocatedTasks() { + Map jobs = new HashMap<>(); + Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build(); + jobs.put(allocatedJob.getId(), allocatedJob); + + Map datafeeds = new HashMap<>(); + DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId()); + datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(allocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + PersistentTasksCustomMetaData originalTasks = tasksBuilder.build(); + PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes); + assertThat(originalTasks, sameInstance(modifedTasks)); + } + private DatafeedConfig createCompatibleDatafeed(String jobId) { // create a datafeed without aggregations or anything // else that may cause validation errors diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 7d72ef7f633e1..9349c56ef75e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -23,6 +24,8 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -52,6 +55,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; @@ -59,9 +63,11 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -577,6 +583,21 @@ public void testJobTaskMatcherMatch() { assertThat(OpenJobAction.JobTaskMatcher.match(jobTask2, "ml-2"), is(true)); } + public void testGetAssignment_GivenJobThatRequiresMigration() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>( + Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES) + )); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor( + Settings.EMPTY, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class)); + + OpenJobAction.JobParams params = new OpenJobAction.JobParams("missing_job_field"); + assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class))); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 4993da215afbc..b6a0c5346ba37 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -121,10 +121,10 @@ public void testWriteConfigToIndex() throws InterruptedException { } public void testMigrateConfigs() throws InterruptedException, IOException { - // and jobs and datafeeds clusterstate MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); mlMetadata.putJob(buildJobBuilder("job-bar").build(), false); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo"); builder.setIndices(Collections.singletonList("beats*")); mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); @@ -149,7 +149,7 @@ public void testMigrateConfigs() throws InterruptedException, IOException { // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); // the first time this is called mlmetadata will be snap-shotted - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -214,7 +214,7 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -252,7 +252,7 @@ public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedExcept // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -285,7 +285,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -361,7 +361,7 @@ public void testConfigIndexIsCreated() throws Exception { // if the cluster state has a job config and the index does not // exist it should be created - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertBusy(() -> assertTrue(configIndexExists())); diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java index e71e116ca6695..0e5ee72546487 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -24,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.List; @@ -35,6 +37,8 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase { + private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job"; + private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed"; private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job"; private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed"; @@ -102,13 +106,42 @@ private void oldClusterTests() throws IOException { Request putStoppedDatafeed = new Request("PUT", "/_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID); putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build())); client().performRequest(putStoppedDatafeed); + + // open job and started datafeed + Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID); + openJob.setAnalysisConfig(analysisConfig); + openJob.setDataDescription(new DataDescription.Builder()); + Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); + putOpenJob.setJsonEntity(Strings.toString(openJob)); + client().performRequest(putOpenJob); + + Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open"); + client().performRequest(openOpenJob); + + DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID); + if (getOldClusterVersion().before(Version.V_6_6_0)) { + dfBuilder.setDelayedDataCheckConfig(null); + } + dfBuilder.setIndices(Collections.singletonList("airline-data")); + + Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); + putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build())); + client().performRequest(putDatafeed); + + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start"); + client().performRequest(startDatafeed); } private void upgradedClusterTests() throws Exception { - // wait for the closed job and datafeed to be migrated - waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), - Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), - Collections.emptyList(), Collections.emptyList()); + // wait for the closed and open jobs and datafeed to be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); + + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); + // The persistent task params for the job & datafeed left open + // during upgrade should be updated with new fields + checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); // open the migrated job and datafeed Request openJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open"); @@ -154,8 +187,7 @@ private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception { } @SuppressWarnings("unchecked") - private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, - List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds) throws Exception { // After v6.6.0 jobs are created in the index so no migration will take place if (getOldClusterVersion().onOrAfter(Version.V_6_6_0)) { @@ -170,48 +202,58 @@ private void waitForMigration(List expectedMigratedJobs, List ex List> jobs = (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); - assertNotNull(jobs); - - for (String jobId : expectedMigratedJobs) { - assertJob(jobId, jobs, false); - } - for (String jobId : unMigratedJobs) { - assertJob(jobId, jobs, true); + if (jobs != null) { + for (String jobId : expectedMigratedJobs) { + assertJobNotPresent(jobId, jobs); + } } List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); - assertNotNull(datafeeds); - for (String datafeedId : expectedMigratedDatafeeds) { - assertDatafeed(datafeedId, datafeeds, false); + if (datafeeds != null) { + for (String datafeedId : expectedMigratedDatafeeds) { + assertDatafeedNotPresent(datafeedId, datafeeds); + } } + }, 30, TimeUnit.SECONDS); + } - for (String datafeedId : unMigratedDatafeeds) { - assertDatafeed(datafeedId, datafeeds, true); + @SuppressWarnings("unchecked") + private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> tasks = + (List>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", responseMap); + assertNotNull(tasks); + for (Map task : tasks) { + String id = (String) task.get("id"); + assertNotNull(id); + if (id.equals(MlTasks.jobTaskId(jobId))) { + Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); + assertNotNull(jobParam); } - - }, 30, TimeUnit.SECONDS); + else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) { + Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task); + assertNotNull(jobIdParam); + Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task); + assertNotNull(indices); + } + } } - private void assertDatafeed(String datafeedId, List> datafeeds, boolean expectedToBePresent) { + private void assertDatafeedNotPresent(String datafeedId, List> datafeeds) { Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) .filter(id -> id.equals(datafeedId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } - private void assertJob(String jobId, List> jobs, boolean expectedToBePresent) { + private void assertJobNotPresent(String jobId, List> jobs) { Optional config = jobs.stream().map(map -> map.get("job_id")) .filter(id -> id.equals(jobId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } }