From ef91aca9e1144af1520d081dfda8903ee7e3b020 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 15 Jan 2019 18:21:39 +0000 Subject: [PATCH 1/8] [ML] Migrate unallocated jobs and datafeeds (#37430) Migrate ml job and datafeed config of open jobs and update the parameters of the persistent tasks as they become unallocated during a rolling upgrade. Block allocation of ml persistent tasks until the configs are migrated. --- .../xpack/core/ml/MlMetadata.java | 4 + .../elasticsearch/xpack/core/ml/MlTasks.java | 79 +++++-- .../xpack/core/ml/MlTasksTests.java | 64 +++-- .../xpack/ml/MlAssignmentNotifier.java | 2 +- .../ml/MlConfigMigrationEligibilityCheck.java | 16 +- .../xpack/ml/MlConfigMigrator.java | 161 ++++++++++--- .../ml/action/TransportOpenJobAction.java | 39 ++-- .../xpack/ml/MlAssignmentNotifierTests.java | 8 +- ...lConfigMigrationEligibilityCheckTests.java | 58 +++++ .../xpack/ml/MlConfigMigratorTests.java | 219 ++++++++++++++---- .../action/TransportOpenJobActionTests.java | 21 ++ .../ml/integration/MlConfigMigratorIT.java | 12 +- .../MlMigrationFullClusterRestartIT.java | 85 +++---- 13 files changed, 586 insertions(+), 182 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 5ca281bd2e6a..17222f6f4f04 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -178,6 +178,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws private static void mapValuesToXContent(ParseField field, Map map, XContentBuilder builder, Params params) throws IOException { + if (map.isEmpty()) { + return; + } + builder.startArray(field.getPreferredName()); for (Map.Entry entry : map.entrySet()) { entry.getValue().toXContent(builder, params); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index b81a1f7d7b9c..c166f64c4e3e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -6,14 +6,16 @@ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -133,6 +135,42 @@ public static Set openJobIds(@Nullable PersistentTasksCustomMetaData tas .collect(Collectors.toSet()); } + /** + * Get the job Ids of anomaly detector job tasks that do + * not have an assignment. + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return The job Ids of tasks to do not have an assignment. + */ + public static Set unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + return unallocatedJobTasks(tasks, nodes).stream() + .map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length())) + .collect(Collectors.toSet()); + } + + /** + * The job tasks that do not have an allocation as determined by + * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return Unallocated job tasks + */ + public static Collection unallocatedJobTasks( + @Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + if (tasks == null) { + return Collections.emptyList(); + } + + return tasks.findTasks(JOB_TASK_NAME, task -> true) + .stream() + .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) + .collect(Collectors.toList()); + } + /** * The datafeed Ids of started datafeed tasks * @@ -151,26 +189,39 @@ public static Set startedDatafeedIds(@Nullable PersistentTasksCustomMeta } /** - * Is there an ml anomaly detector job task for the job {@code jobId}? - * @param jobId The job id - * @param tasks Persistent tasks - * @return True if the job has a task + * Get the datafeed Ids of started datafeed tasks + * that do not have an assignment. + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return The job Ids of tasks to do not have an assignment. */ - public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { - return openJobIds(tasks).contains(jobId); + public static Set unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + + return unallocatedDatafeedTasks(tasks, nodes).stream() + .map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length())) + .collect(Collectors.toSet()); } /** - * Read the active anomaly detector job tasks. - * Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}. + * The datafeed tasks that do not have an allocation as determined by + * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} * - * @param tasks Persistent tasks - * @return The job tasks excluding closed and failed jobs + * @param tasks Persistent tasks. If null an empty set is returned. + * @param nodes The cluster nodes + * @return Unallocated datafeed tasks */ - public static List> activeJobTasks(PersistentTasksCustomMetaData tasks) { - return tasks.findTasks(JOB_TASK_NAME, task -> true) + public static Collection unallocatedDatafeedTasks( + @Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + if (tasks == null) { + return Collections.emptyList(); + } + + return tasks.findTasks(DATAFEED_TASK_NAME, task -> true) .stream() - .filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false) + .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) .collect(Collectors.toList()); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index 408520472c4f..e80b47b057bf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -6,6 +6,10 @@ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -14,12 +18,14 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import java.net.InetAddress; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; public class MlTasksTests extends ESTestCase { public void testGetJobState() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); // A missing task is a closed job assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build())); // A task with no status is opening @@ -52,7 +58,7 @@ public void testGetDatefeedState() { public void testGetJobTask() { assertNull(MlTasks.getJobTask("foo", null)); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); @@ -73,7 +79,7 @@ public void testGetDatafeedTask() { } public void testOpenJobIds() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), @@ -92,7 +98,7 @@ public void testOpenJobIds_GivenNull() { } public void testStartedDatafeedIds() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), @@ -111,16 +117,48 @@ public void testStartedDatafeedIds_GivenNull() { assertThat(MlTasks.startedDatafeedIds(null), empty()); } - public void testTaskExistsForJob() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); - - tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"), + public void testUnallocatedJobIds() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId("job_with_assignment"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_with_assignment"), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("job_without_assignment"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_without_assignment"), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("job_without_node"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("job_without_node"), + new PersistentTasksCustomMetaData.Assignment("dead-node", "expired node")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node-1") + .masterNodeId("node-1") + .build(); + + assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes), + containsInAnyOrder("job_without_assignment", "job_without_node")); + } - assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); - assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build())); + public void testUnallocatedDatafeedIds() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_with_assignment"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_with_assignment", 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_assignment"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_without_assignment", 0L), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_node"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("datafeed_without_node", 0L), + new PersistentTasksCustomMetaData.Assignment("dead_node", "expired node")); + + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node-1") + .masterNodeId("node-1") + .build(); + + assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes), + containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node")); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 9db17ed44843..45ace94614a1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -57,7 +57,7 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( + mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap( response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)), e -> { logger.error("error migrating ml configurations", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 72cb52424c3b..daa143ec0197 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -79,9 +79,10 @@ static boolean mlConfigIndexIsAllocated(ClusterState clusterState) { * False if {@link #canStartMigration(ClusterState)} returns {@code false} * False if the job is not in the cluster state * False if the {@link Job#isDeleting()} - * False if the job has a persistent task + * False if the job has an allocated persistent task * True otherwise i.e. the job is present, not deleting - * and does not have a persistent task. + * and does not have a persistent task or its persistent + * task is un-allocated * * @param jobId The job Id * @param clusterState The cluster state @@ -100,15 +101,17 @@ public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState } PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; + return MlTasks.openJobIds(persistentTasks).contains(jobId) == false || + MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId); } /** * Is the datafeed a eligible for migration? Returns: * False if {@link #canStartMigration(ClusterState)} returns {@code false} * False if the datafeed is not in the cluster state - * False if the datafeed has a persistent task - * True otherwise i.e. the datafeed is present and does not have a persistent task. + * False if the datafeed has an allocated persistent task + * True otherwise i.e. the datafeed is present and does not have a persistent + * task or its persistent task is un-allocated * * @param datafeedId The datafeed Id * @param clusterState The cluster state @@ -125,6 +128,7 @@ public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState cl } PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; + return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false + || MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 4c39f78feeb6..97e8320f612b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -37,6 +38,8 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -68,25 +71,28 @@ /** * Migrates job and datafeed configurations from the clusterstate to - * index documents. + * index documents for closed or unallocated tasks. * * There are 3 steps to the migration process * 1. Read config from the clusterstate + * - Find all job and datafeed configs that do not have an associated persistent + * task or the persistent task is unallocated * - If a job or datafeed is added after this call it will be added to the index * - If deleted then it's possible the config will be copied before it is deleted. * Mitigate against this by filtering out jobs marked as deleting * 2. Copy the config to the index * - The index operation could fail, don't delete from clusterstate in this case - * 3. Remove config from the clusterstate + * 3. Remove config from the clusterstate and update persistent task parameters * - Before this happens config is duplicated in index and clusterstate, all ops - * must prefer to use the index config at this stage + * must prefer to use the clusterstate config at this stage * - If the clusterstate update fails then the config will remain duplicated * and the migration process should try again + * - Job and datafeed tasks opened prior to v6.6.0 need to be updated with new + * parameters * * If there was an error in step 3 and the config is in both the clusterstate and - * index then when the migrator retries it must not overwrite an existing job config - * document as once the index document is present all update operations will function - * on that rather than the clusterstate. + * index. At this point the clusterstate config is preferred and all update + * operations will function on that rather than the index. * * The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE} * pairs of datafeeds and jobs are migrated together. @@ -131,7 +137,7 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster * @param clusterState The current clusterstate * @param listener The success listener */ - public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { + public void migrateConfigs(ClusterState clusterState, ActionListener listener) { if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; @@ -184,8 +190,8 @@ private void migrateBatches(List batches, ActionListener writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap( failedDocumentIds -> { - List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); - List successfulDatafeedWrites = + List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); + List successfulDatafeedWrites = filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs); removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener); }, @@ -217,24 +223,33 @@ public void writeConfigToIndex(Collection datafeedsToMigrate, ); } - private void removeFromClusterState(List jobsToRemoveIds, List datafeedsToRemoveIds, + private void removeFromClusterState(List jobsToRemove, List datafeedsToRemove, ActionListener listener) { - if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) { + if (jobsToRemove.isEmpty() && datafeedsToRemove.isEmpty()) { listener.onResponse(null); return; } + Map jobsMap = jobsToRemove.stream().collect(Collectors.toMap(Job::getId, Function.identity())); + Map datafeedMap = + datafeedsToRemove.stream().collect(Collectors.toMap(DatafeedConfig::getId, Function.identity())); + AtomicReference removedConfigs = new AtomicReference<>(); clusterService.submitStateUpdateTask("remove-migrated-ml-configs", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - RemovalResult removed = removeJobsAndDatafeeds(jobsToRemoveIds, datafeedsToRemoveIds, + RemovalResult removed = removeJobsAndDatafeeds(jobsToRemove, datafeedsToRemove, MlMetadata.getMlMetadata(currentState)); removedConfigs.set(removed); + + PersistentTasksCustomMetaData updatedTasks = rewritePersistentTaskParams(jobsMap, datafeedMap, + currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE), currentState.nodes()); + ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(MlMetadata.TYPE, removed.mlMetadata) + .putCustom(PersistentTasksCustomMetaData.TYPE, updatedTasks) .build()); return newState.build(); } @@ -259,6 +274,82 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + /** + * Find any unallocated datafeed and job tasks and update their persistent + * task parameters if they have missing fields that were added in v6.6. If + * a task exists with a missing field it must have been created in an earlier + * version and survived an elasticsearch upgrade. + * + * If there are no unallocated tasks the {@code currentTasks} argument is returned. + * + * @param jobs Job configs + * @param datafeeds Datafeed configs + * @param currentTasks The persistent tasks + * @param nodes The nodes in the cluster + * @return The updated tasks + */ + public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map jobs, Map datafeeds, + PersistentTasksCustomMetaData currentTasks, + DiscoveryNodes nodes) { + + Collection unallocatedJobTasks = MlTasks.unallocatedJobTasks(currentTasks, nodes); + Collection unallocatedDatafeedsTasks = + MlTasks.unallocatedDatafeedTasks(currentTasks, nodes); + + if (unallocatedJobTasks.isEmpty() && unallocatedDatafeedsTasks.isEmpty()) { + return currentTasks; + } + + PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(currentTasks); + + for (PersistentTasksCustomMetaData.PersistentTask jobTask : unallocatedJobTasks) { + OpenJobAction.JobParams originalParams = (OpenJobAction.JobParams) jobTask.getParams(); + if (originalParams.getJob() == null) { + Job job = jobs.get(originalParams.getJobId()); + if (job != null) { + logger.debug("updating persistent task params for job [{}]", originalParams.getJobId()); + + // copy and update the job parameters + OpenJobAction.JobParams updatedParams = new OpenJobAction.JobParams(originalParams.getJobId()); + updatedParams.setTimeout(originalParams.getTimeout()); + updatedParams.setJob(job); + + // replace with the updated params + taskBuilder.removeTask(jobTask.getId()); + taskBuilder.addTask(jobTask.getId(), jobTask.getTaskName(), updatedParams, jobTask.getAssignment()); + } else { + logger.error("cannot find job for task [{}]", jobTask.getId()); + } + } + } + + for (PersistentTasksCustomMetaData.PersistentTask datafeedTask : unallocatedDatafeedsTasks) { + StartDatafeedAction.DatafeedParams originalParams = (StartDatafeedAction.DatafeedParams) datafeedTask.getParams(); + + if (originalParams.getJobId() == null) { + DatafeedConfig datafeedConfig = datafeeds.get(originalParams.getDatafeedId()); + if (datafeedConfig != null) { + logger.debug("Updating persistent task params for datafeed [{}]", originalParams.getDatafeedId()); + + StartDatafeedAction.DatafeedParams updatedParams = + new StartDatafeedAction.DatafeedParams(originalParams.getDatafeedId(), originalParams.getStartTime()); + updatedParams.setTimeout(originalParams.getTimeout()); + updatedParams.setEndTime(originalParams.getEndTime()); + updatedParams.setJobId(datafeedConfig.getJobId()); + updatedParams.setDatafeedIndices(datafeedConfig.getIndices()); + + // replace with the updated params + taskBuilder.removeTask(datafeedTask.getId()); + taskBuilder.addTask(datafeedTask.getId(), datafeedTask.getTaskName(), updatedParams, datafeedTask.getAssignment()); + } else { + logger.error("cannot find datafeed for task [{}]", datafeedTask.getId()); + } + } + } + + return taskBuilder.build(); + } + static class RemovalResult { MlMetadata mlMetadata; List removedJobIds; @@ -283,20 +374,20 @@ static class RemovalResult { * @return Structure tracking which jobs and datafeeds were actually removed * and the new MlMetadata */ - static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List datafeedsToRemove, MlMetadata mlMetadata) { + static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List datafeedsToRemove, MlMetadata mlMetadata) { Map currentJobs = new HashMap<>(mlMetadata.getJobs()); List removedJobIds = new ArrayList<>(); - for (String jobId : jobsToRemove) { - if (currentJobs.remove(jobId) != null) { - removedJobIds.add(jobId); + for (Job job : jobsToRemove) { + if (currentJobs.remove(job.getId()) != null) { + removedJobIds.add(job.getId()); } } Map currentDatafeeds = new HashMap<>(mlMetadata.getDatafeeds()); List removedDatafeedIds = new ArrayList<>(); - for (String datafeedId : datafeedsToRemove) { - if (currentDatafeeds.remove(datafeedId) != null) { - removedDatafeedIds.add(datafeedId); + for (DatafeedConfig datafeed : datafeedsToRemove) { + if (currentDatafeeds.remove(datafeed.getId()) != null) { + removedDatafeedIds.add(datafeed.getId()); } } @@ -455,15 +546,18 @@ public static List nonDeletingJobs(List jobs) { } /** - * Find the configurations for all closed jobs in the cluster state. - * Closed jobs are those that do not have an associated persistent task. + * Find the configurations for all closed jobs and the jobs that + * do not have an allocation in the cluster state. + * Closed jobs are those that do not have an associated persistent task, + * unallocated jobs have a task but no executing node * * @param clusterState The cluster state * @return The closed job configurations */ - public static List closedJobConfigs(ClusterState clusterState) { + public static List closedOrUnallocatedJobs(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set openJobIds = MlTasks.openJobIds(persistentTasks); + openJobIds.removeAll(MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getJobs().values().stream() @@ -472,15 +566,18 @@ public static List closedJobConfigs(ClusterState clusterState) { } /** - * Find the configurations for stopped datafeeds in the cluster state. - * Stopped datafeeds are those that do not have an associated persistent task. + * Find the configurations for stopped datafeeds and datafeeds that do + * not have an allocation in the cluster state. + * Stopped datafeeds are those that do not have an associated persistent task, + * unallocated datafeeds have a task but no executing node. * * @param clusterState The cluster state * @return The closed job configurations */ - public static List stoppedDatafeedConfigs(ClusterState clusterState) { + public static List stopppedOrUnallocatedDatafeeds(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks); + startedDatafeedIds.removeAll(MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getDatafeeds().values().stream() @@ -503,8 +600,8 @@ public int totalCount() { } public static List splitInBatches(ClusterState clusterState) { - Collection stoppedDatafeeds = stoppedDatafeedConfigs(clusterState); - Map eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream() + Collection stoppedDatafeeds = stopppedOrUnallocatedDatafeeds(clusterState); + Map eligibleJobs = nonDeletingJobs(closedOrUnallocatedJobs(clusterState)).stream() .map(MlConfigMigrator::updateJobForMigration) .collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a)); @@ -586,17 +683,15 @@ static Set documentsNotWritten(BulkResponse response) { return failedDocumentIds; } - static List filterFailedJobConfigWrites(Set failedDocumentIds, List jobs) { + static List filterFailedJobConfigWrites(Set failedDocumentIds, List jobs) { return jobs.stream() - .map(Job::getId) - .filter(id -> failedDocumentIds.contains(Job.documentId(id)) == false) + .filter(job -> failedDocumentIds.contains(Job.documentId(job.getId())) == false) .collect(Collectors.toList()); } - static List filterFailedDatafeedConfigWrites(Set failedDocumentIds, Collection datafeeds) { + static List filterFailedDatafeedConfigWrites(Set failedDocumentIds, Collection datafeeds) { return datafeeds.stream() - .map(DatafeedConfig::getId) - .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) + .filter(datafeed -> failedDocumentIds.contains(DatafeedConfig.documentId(datafeed.getId())) == false) .collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 2a6ca2cf5c93..09c1e448aada 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -107,6 +106,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + @@ -219,16 +222,16 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j continue; } - if (job != null) { - Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); - if (compatibleJobTypes.contains(job.getJobType()) == false) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + - "], because this node does not support jobs of type [" + job.getJobType() + "]"; - logger.trace(reason); - reasons.add(reason); - continue; - } + Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); + if (compatibleJobTypes.contains(job.getJobType()) == false) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + + "], because this node does not support jobs of type [" + job.getJobType() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } + if (job != null) { if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; @@ -823,16 +826,16 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { - Job foundJob = params.getJob(); - if (foundJob == null) { - // The job was added to the persistent task parameters in 6.6.0 - // if the field is not present the task was created before 6.6.0. - // In which case the job should still be in the clusterstate - foundJob = MlMetadata.getMlMetadata(clusterState).getJobs().get(params.getJobId()); + // If the task parameters do not have a job field then the job + // was first opened on a pre v6.6 node and has not been migrated + + // TODO is eligible for migration check (all nodes are at the same version) + if (params.getJob() == null) { + return AWAITING_MIGRATION; } PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), - foundJob, + params.getJob(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 5c8c25379479..3e31c8d564b6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -63,7 +63,7 @@ private void setupMocks() { ActionListener listener = (ActionListener) invocation.getArguments()[1]; listener.onResponse(Boolean.TRUE); return null; - }).when(configMigrator).migrateConfigsWithoutTasks(any(ClusterState.class), any(ActionListener.class)); + }).when(configMigrator).migrateConfigs(any(ClusterState.class), any(ActionListener.class)); } public void testClusterChanged_info() { @@ -87,7 +87,7 @@ public void testClusterChanged_info() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).info(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); + verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -120,7 +120,7 @@ public void testClusterChanged_warning() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).warning(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); + verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -153,7 +153,7 @@ public void testClusterChanged_noPersistentTaskChanges() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + verify(configMigrator, times(1)).migrateConfigs(any(), any()); verifyNoMoreInteractions(auditor); // no longer master diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index 4785f9f75a5c..4a70bcf02d3a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -270,6 +270,34 @@ public void testJobIsEligibleForMigration_givenClosedJob() { assertTrue(check.jobIsEligibleForMigration(closedJob.getId(), clusterState)); } + public void testJobIsEligibleForMigration_givenOpenAndUnallocatedJob() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .routingTable(routingTable.build()) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.jobIsEligibleForMigration(openJob.getId(), clusterState)); + } + public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) @@ -371,6 +399,36 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); } + public void testDatafeedIsEligibleForMigration_givenUnallocatedDatafeed() { + Job job = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); + String datafeedId = "df-" + job.getId(); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedId, 0L), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .routingTable(routingTable.build()) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + private void givenClusterSettings(Settings settings) { ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList( MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index cff299b9fa1a..62c29efdff96 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -11,7 +11,10 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -23,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTests; import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -37,6 +41,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -50,74 +55,82 @@ public void testNonDeletingJobs() { assertThat(MlConfigMigrator.nonDeletingJobs(Arrays.asList(job1, job2, deletingJob)), containsInAnyOrder(job1, job2)); } - public void testClosedJobConfigs() { - Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); - Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); + public void testClosedOrUnallocatedJobs() { + Job closedJob = JobTests.buildJobBuilder("closedjob").build(); + Job jobWithoutAllocation = JobTests.buildJobBuilder("jobwithoutallocation").build(); + Job openJob = JobTests.buildJobBuilder("openjob").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder() - .putJob(openJob1, false) - .putJob(openJob2, false) - .putDatafeed(createCompatibleDatafeed(openJob1.getId()), Collections.emptyMap()); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) - ) - .build(); - - assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob1, openJob2)); + .putJob(closedJob, false) + .putJob(jobWithoutAllocation, false) + .putJob(openJob, false) + .putDatafeed(createCompatibleDatafeed(closedJob.getId()), Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("jobwithoutallocation"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("jobwithoutallocation"), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId("openjob"), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams("openjob"), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); - clusterState = ClusterState.builder(new ClusterName("migratortests")) + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) .metaData(MetaData.builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) ) + .nodes(nodes) .build(); - assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob2)); + assertThat(MlConfigMigrator.closedOrUnallocatedJobs(clusterState), containsInAnyOrder(closedJob, jobWithoutAllocation)); } public void testStoppedDatafeedConfigs() { - Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); - Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); - DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(openJob1.getId()); - DatafeedConfig datafeedConfig2 = createCompatibleDatafeed(openJob2.getId()); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder() - .putJob(openJob1, false) - .putJob(openJob2, false) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .putDatafeed(datafeedConfig2, Collections.emptyMap()); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) - ) - .build(); - - assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig1, datafeedConfig2)); + Job job1 = JobTests.buildJobBuilder("job1").build(); + Job job2 = JobTests.buildJobBuilder("job2").build(); + Job job3 = JobTests.buildJobBuilder("job3").build(); + DatafeedConfig stopppedDatafeed = createCompatibleDatafeed(job1.getId()); + DatafeedConfig datafeedWithoutAllocation = createCompatibleDatafeed(job2.getId()); + DatafeedConfig startedDatafeed = createCompatibleDatafeed(job3.getId()); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(job1, false) + .putJob(job2, false) + .putJob(job3, false) + .putDatafeed(stopppedDatafeed, Collections.emptyMap()) + .putDatafeed(datafeedWithoutAllocation, Collections.emptyMap()) + .putDatafeed(startedDatafeed, Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedConfig1.getId()), MlTasks.DATAFEED_TASK_NAME, - new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L), - new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(stopppedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment(null, "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(startedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); - clusterState = ClusterState.builder(new ClusterName("migratortests")) + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) .metaData(MetaData.builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) ) + .nodes(nodes) .build(); - assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig2)); + assertThat(MlConfigMigrator.stopppedOrUnallocatedDatafeeds(clusterState), + containsInAnyOrder(stopppedDatafeed, datafeedWithoutAllocation)); } public void testUpdateJobForMigration() { @@ -155,7 +168,7 @@ public void testFilterFailedJobConfigWrites() { assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.emptySet(), jobs), hasSize(3)); assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.singleton(Job.documentId("bar")), jobs), - contains("foo", "baz")); + contains(jobs.get(0), jobs.get(2))); } public void testFilterFailedDatafeedConfigWrites() { @@ -166,7 +179,7 @@ public void testFilterFailedDatafeedConfigWrites() { assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.emptySet(), datafeeds), hasSize(3)); assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.singleton(DatafeedConfig.documentId("df-foo")), datafeeds), - contains("df-bar", "df-baz")); + contains(datafeeds.get(1), datafeeds.get(2))); } public void testDocumentsNotWritten() { @@ -197,7 +210,7 @@ public void testRemoveJobsAndDatafeeds_removeAll() { .putDatafeed(datafeedConfig2, Collections.emptyMap()); MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( - Arrays.asList("job1", "job2"), Arrays.asList("df-job1", "df-job2"), mlMetadata.build()); + Arrays.asList(job1, job2), Arrays.asList(datafeedConfig1, datafeedConfig2), mlMetadata.build()); assertThat(removalResult.mlMetadata.getJobs().keySet(), empty()); assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), empty()); @@ -215,7 +228,8 @@ public void testRemoveJobsAndDatafeeds_removeSome() { .putDatafeed(datafeedConfig1, Collections.emptyMap()); MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( - Arrays.asList("job1", "job-none"), Collections.singletonList("df-none"), mlMetadata.build()); + Arrays.asList(job1, JobTests.buildJobBuilder("job-none").build()), + Collections.singletonList(createCompatibleDatafeed("job-none")), mlMetadata.build()); assertThat(removalResult.mlMetadata.getJobs().keySet(), contains("job2")); assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), contains("df-job1")); @@ -300,6 +314,115 @@ public void testLimitWrites_GivenNullJob() { assertThat(jobsAndDatafeeds.jobs, empty()); } + public void testRewritePersistentTaskParams() { + Map jobs = new HashMap<>(); + Job closedJob = JobTests.buildJobBuilder("closed-job").build(); + Job unallocatedJob = JobTests.buildJobBuilder("job-to-update").build(); + Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build(); + jobs.put(closedJob.getId(), closedJob); + jobs.put(unallocatedJob.getId(), unallocatedJob); + jobs.put(allocatedJob.getId(), allocatedJob); + + Map datafeeds = new HashMap<>(); + DatafeedConfig stoppedDatafeed = createCompatibleDatafeed(closedJob.getId()); + DatafeedConfig unallocatedDatafeed = createCompatibleDatafeed(unallocatedJob.getId()); + DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId()); + datafeeds.put(stoppedDatafeed.getId(), stoppedDatafeed); + datafeeds.put(unallocatedDatafeed.getId(), unallocatedDatafeed); + datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + // job tasks + tasksBuilder.addTask(MlTasks.jobTaskId(unallocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(unallocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(allocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + // datafeed tasks + tasksBuilder.addTask(MlTasks.datafeedTaskId(unallocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(unallocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment(null, "no assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + PersistentTasksCustomMetaData originalTasks = tasksBuilder.build(); + OpenJobAction.JobParams originalUnallocatedTaskParams = (OpenJobAction.JobParams) originalTasks.getTask( + MlTasks.jobTaskId(unallocatedJob.getId())).getParams(); + assertNull(originalUnallocatedTaskParams.getJob()); + StartDatafeedAction.DatafeedParams originalUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask( + MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams(); + assertNull(originalUnallocatedDatafeedParams.getJobId()); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes); + + // The unallocated task should be modifed + OpenJobAction.JobParams modifedUnallocatedTaskParams = + (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(unallocatedJob.getId())).getParams(); + assertNotEquals(originalUnallocatedTaskParams, modifedUnallocatedTaskParams); + assertEquals(unallocatedJob, modifedUnallocatedTaskParams.getJob()); + + // the allocated task should not be modified + OpenJobAction.JobParams allocatedJobParams = + (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams(); + assertEquals(null, allocatedJobParams.getJob()); + OpenJobAction.JobParams originalAllocatedJobParams = + (OpenJobAction.JobParams) originalTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams(); + assertEquals(originalAllocatedJobParams, allocatedJobParams); + + + // unallocated datafeed should be updated + StartDatafeedAction.DatafeedParams modifiedUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask( + MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams(); + assertNotEquals(originalUnallocatedDatafeedParams, modifiedUnallocatedDatafeedParams); + assertEquals(unallocatedDatafeed.getJobId(), modifiedUnallocatedDatafeedParams.getJobId()); + assertEquals(unallocatedDatafeed.getIndices(), modifiedUnallocatedDatafeedParams.getDatafeedIndices()); + + // allocated datafeed will not be updated + StartDatafeedAction.DatafeedParams allocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask( + MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams(); + assertNull(allocatedDatafeedParams.getJobId()); + assertThat(allocatedDatafeedParams.getDatafeedIndices(), empty()); + StartDatafeedAction.DatafeedParams originalAllocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask( + MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams(); + assertEquals(originalAllocatedDatafeedParams, allocatedDatafeedParams); + } + + public void testRewritePersistentTaskParams_GivenNoUnallocatedTasks() { + Map jobs = new HashMap<>(); + Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build(); + jobs.put(allocatedJob.getId(), allocatedJob); + + Map datafeeds = new HashMap<>(); + DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId()); + datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME, + new OpenJobAction.JobParams(allocatedJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + PersistentTasksCustomMetaData originalTasks = tasksBuilder.build(); + PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes); + assertThat(originalTasks, sameInstance(modifedTasks)); + } + private DatafeedConfig createCompatibleDatafeed(String jobId) { // create a datafeed without aggregations or anything // else that may cause validation errors diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index d29d815fdf39..f74c2fe30efa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -24,6 +25,8 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -53,6 +56,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; @@ -60,9 +64,11 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -620,6 +626,21 @@ public void testJobTaskMatcherMatch() { assertThat(OpenJobAction.JobTaskMatcher.match(jobTask2, "ml-2"), is(true)); } + public void testGetAssignment_GivenJobThatRequiresMigration() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>( + Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES) + )); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor( + Settings.EMPTY, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class)); + + OpenJobAction.JobParams params = new OpenJobAction.JobParams("missing_job_field"); + assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class))); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 228c9cdaf2c2..08ade55a4682 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -134,10 +134,10 @@ public void testWriteConfigToIndex() throws InterruptedException { } public void testMigrateConfigs() throws InterruptedException, IOException { - // and jobs and datafeeds clusterstate MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); mlMetadata.putJob(buildJobBuilder("job-bar").build(), false); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo"); builder.setIndices(Collections.singletonList("beats*")); mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); @@ -163,7 +163,7 @@ public void testMigrateConfigs() throws InterruptedException, IOException { // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); // the first time this is called mlmetadata will be snap-shotted - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -287,7 +287,7 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -325,7 +325,7 @@ public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedExcept // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -358,7 +358,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); @@ -434,7 +434,7 @@ public void testConfigIndexIsCreated() throws Exception { // if the cluster state has a job config and the index does not // exist it should be created - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertBusy(() -> assertTrue(configIndexExists())); diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java index 59c634981d19..a7b3aead7a1c 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -138,16 +139,15 @@ private void oldClusterTests() throws IOException { } private void upgradedClusterTests() throws Exception { - // wait for the closed job and datafeed to be migrated - waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), - Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), - Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID), - Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID)); - - // the job and datafeed left open during upgrade should - // be assigned to a node + // wait for the closed and open jobs and datafeed to be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); + // The persistent task params for the job & datafeed left open + // during upgrade should be updated with new fields + checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); // open the migrated job and datafeed Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open"); @@ -164,9 +164,7 @@ private void upgradedClusterTests() throws Exception { // now all jobs should be migrated waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), - Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID), - Collections.emptyList(), - Collections.emptyList()); + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); } @SuppressWarnings("unchecked") @@ -203,8 +201,7 @@ private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception { } @SuppressWarnings("unchecked") - private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, - List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds) throws Exception { // After v6.6.0 jobs are created in the index so no migration will take place if (getOldClusterVersion().onOrAfter(Version.V_6_6_0)) { @@ -219,48 +216,58 @@ private void waitForMigration(List expectedMigratedJobs, List ex List> jobs = (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); - assertNotNull(jobs); - - for (String jobId : expectedMigratedJobs) { - assertJob(jobId, jobs, false); - } - for (String jobId : unMigratedJobs) { - assertJob(jobId, jobs, true); + if (jobs != null) { + for (String jobId : expectedMigratedJobs) { + assertJobNotPresent(jobId, jobs); + } } List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); - assertNotNull(datafeeds); - for (String datafeedId : expectedMigratedDatafeeds) { - assertDatafeed(datafeedId, datafeeds, false); + if (datafeeds != null) { + for (String datafeedId : expectedMigratedDatafeeds) { + assertDatafeedNotPresent(datafeedId, datafeeds); + } } + }, 30, TimeUnit.SECONDS); + } - for (String datafeedId : unMigratedDatafeeds) { - assertDatafeed(datafeedId, datafeeds, true); + @SuppressWarnings("unchecked") + private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> tasks = + (List>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", responseMap); + assertNotNull(tasks); + for (Map task : tasks) { + String id = (String) task.get("id"); + assertNotNull(id); + if (id.equals(MlTasks.jobTaskId(jobId))) { + Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); + assertNotNull(jobParam); } - - }, 30, TimeUnit.SECONDS); + else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) { + Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task); + assertNotNull(jobIdParam); + Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task); + assertNotNull(indices); + } + } } - private void assertDatafeed(String datafeedId, List> datafeeds, boolean expectedToBePresent) { + private void assertDatafeedNotPresent(String datafeedId, List> datafeeds) { Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) .filter(id -> id.equals(datafeedId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } - private void assertJob(String jobId, List> jobs, boolean expectedToBePresent) { + private void assertJobNotPresent(String jobId, List> jobs) { Optional config = jobs.stream().map(map -> map.get("job_id")) .filter(id -> id.equals(jobId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } } From ae30c429c4967a5bc84eea5325e8fa46f705e163 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 16 Jan 2019 13:39:03 +0000 Subject: [PATCH 2/8] Update rolling upgrade test --- .../elasticsearch/upgrades/MlMigrationIT.java | 132 +++++++----------- .../mixed_cluster/60_ml_config_migration.yml | 3 - .../60_ml_config_migration.yml | 5 +- 3 files changed, 55 insertions(+), 85 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java index add428611431..2cbebd1cc3a3 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -190,11 +191,6 @@ private void mixedClusterTests() throws Exception { assertConfigInClusterState(); checkJobs(); checkDatafeeds(); - - // the job and datafeed left open during upgrade should - // be assigned to a node - waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); - waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); } private void upgradedClusterTests() throws Exception { @@ -208,10 +204,12 @@ private void upgradedClusterTests() throws Exception { datafeedStarted = startMigratedDatafeed(OLD_CLUSTER_STOPPED_DATAFEED_ID); } - waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), - Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), - Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID), - Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID)); + // wait for the closed and open jobs and datafeed to be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); + + checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); + checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); // the job and datafeed left open during upgrade should // be assigned to a node @@ -236,14 +234,6 @@ private void upgradedClusterTests() throws Exception { Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); client().performRequest(closeJob); - // now all jobs should be migrated - waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), - Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID), - Collections.emptyList(), - Collections.emptyList()); - - checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); - Request deleteDatafeed = new Request("DELETE", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); client().performRequest(deleteDatafeed); Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); @@ -289,10 +279,6 @@ private void checkJobs() throws IOException { assertEquals(OLD_CLUSTER_CLOSED_JOB_ID, XContentMapValues.extractValue("job_id", jobStats.get(0))); assertEquals("closed", XContentMapValues.extractValue("state", jobStats.get(0))); assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); - - assertEquals(OLD_CLUSTER_OPEN_JOB_ID, XContentMapValues.extractValue("job_id", jobStats.get(2))); - assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(2))); - assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(2)), isEmptyOrNullString()); } @SuppressWarnings("unchecked") @@ -336,6 +322,30 @@ private void checkJobsMarkedAsMigrated(List jobIds) throws IOException { } } + @SuppressWarnings("unchecked") + private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> tasks = + (List>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", responseMap); + assertNotNull(tasks); + for (Map task : tasks) { + String id = (String) task.get("id"); + assertNotNull(id); + if (id.equals(MlTasks.jobTaskId(jobId))) { + Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); + assertNotNull(jobParam); + } else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) { + Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task); + assertNotNull(jobIdParam); + Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task); + assertNotNull(indices); + } + } + } + @SuppressWarnings("unchecked") private void assertConfigInClusterState() throws IOException { Request getClusterState = new Request("GET", "/_cluster/state/metadata"); @@ -363,8 +373,7 @@ private void assertConfigInClusterState() throws IOException { } @SuppressWarnings("unchecked") - private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, - List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds) throws Exception { assertBusy(() -> { // wait for the eligible configs to be moved from the clusterstate Request getClusterState = new Request("GET", "/_cluster/state/metadata"); @@ -373,26 +382,20 @@ private void waitForMigration(List expectedMigratedJobs, List ex List> jobs = (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); - assertNotNull(jobs); - for (String jobId : expectedMigratedJobs) { - assertJobMigrated(jobId, jobs); - } - - for (String jobId : unMigratedJobs) { - assertJobNotMigrated(jobId, jobs); + if (jobs != null) { + for (String jobId : expectedMigratedJobs) { + assertJobMigrated(jobId, jobs); + } } List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); - assertNotNull(datafeeds); - - for (String datafeedId : expectedMigratedDatafeeds) { - assertDatafeedMigrated(datafeedId, datafeeds); - } - for (String datafeedId : unMigratedDatafeeds) { - assertDatafeedNotMigrated(datafeedId, datafeeds); + if (datafeeds != null) { + for (String datafeedId : expectedMigratedDatafeeds) { + assertDatafeedMigrated(datafeedId, datafeeds); + } } }, 30, TimeUnit.SECONDS); @@ -401,21 +404,17 @@ private void waitForMigration(List expectedMigratedJobs, List ex @SuppressWarnings("unchecked") private void waitForJobToBeAssigned(String jobId) throws Exception { assertBusy(() -> { - try { - Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId + "/_stats"); - Response response = client().performRequest(getJobStats); - - Map stats = entityAsMap(response); - List> jobStats = - (List>) XContentMapValues.extractValue("jobs", stats); + Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId + "/_stats"); + Response response = client().performRequest(getJobStats); - assertEquals(jobId, XContentMapValues.extractValue("job_id", jobStats.get(0))); - assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(0))); - assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); - assertNotNull(XContentMapValues.extractValue("node", jobStats.get(0))); - } catch (IOException e) { + Map stats = entityAsMap(response); + List> jobStats = + (List>) XContentMapValues.extractValue("jobs", stats); - } + assertEquals(jobId, XContentMapValues.extractValue("job_id", jobStats.get(0))); + assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(0))); + assertThat((String) XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); + assertNotNull(XContentMapValues.extractValue("node", jobStats.get(0))); }, 30, TimeUnit.SECONDS); } @@ -521,7 +520,9 @@ private boolean updateDatafeedExpectingSuccessOr503(String datafeedId, Request r Map clusterStateMap = entityAsMap(response); List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateMap); - assertDatafeedMigrated(datafeedId, datafeeds); + if (datafeeds != null) { + assertDatafeedMigrated(datafeedId, datafeeds); + } return true; } catch (ResponseException e) { // a fail request is ok if the error was that the config has not been migrated @@ -539,39 +540,14 @@ private void assertJobIsMarkedAsMigrated(Map job) { } private void assertDatafeedMigrated(String datafeedId, List> datafeeds) { - assertDatafeed(datafeedId, datafeeds, false); - } - - private void assertDatafeedNotMigrated(String datafeedId, List> datafeeds) { - assertDatafeed(datafeedId, datafeeds, true); - } - - private void assertDatafeed(String datafeedId, List> datafeeds, boolean expectedToBePresent) { Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) .filter(id -> id.equals(datafeedId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } private void assertJobMigrated(String jobId, List> jobs) { - assertJob(jobId, jobs, false); - } - - private void assertJobNotMigrated(String jobId, List> jobs) { - assertJob(jobId, jobs, true); - } - - private void assertJob(String jobId, List> jobs, boolean expectedToBePresent) { Optional config = jobs.stream().map(map -> map.get("job_id")) .filter(id -> id.equals(jobId)).findFirst(); - if (expectedToBePresent) { - assertTrue(config.isPresent()); - } else { - assertFalse(config.isPresent()); - } + assertFalse(config.isPresent()); } - } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml index b076828fc856..da80ad9cefe0 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml @@ -20,8 +20,6 @@ - match: { jobs.0.state: closed } - is_false: jobs.0.node - match: { jobs.1.job_id: migration-old-cluster-open-job} - - match: { jobs.1.state: opened } - - is_false: jobs.1.assignment_explanation - do: xpack.ml.get_datafeeds: @@ -36,7 +34,6 @@ xpack.ml.get_datafeed_stats: datafeed_id: migration* - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed} - - match: { datafeeds.0.state: started } - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed} - match: { datafeeds.1.state: stopped } - is_false: datafeeds.1.node diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml index be36d7358e79..f1aee175ead7 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml @@ -7,7 +7,7 @@ setup: timeout: 70s --- -"Test old cluster jobs and datafeeds and delete them": +"Test old cluster jobs and datafeeds": - do: xpack.ml.get_jobs: @@ -24,8 +24,6 @@ setup: - match: { jobs.0.state: closed } - is_false: jobs.0.node - match: { jobs.1.job_id: migration-old-cluster-open-job } - - match: { jobs.1.state: opened } - - is_false: jobs.1.assignment_explanation - do: xpack.ml.get_datafeeds: @@ -40,7 +38,6 @@ setup: xpack.ml.get_datafeed_stats: datafeed_id: migration* - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed } - - match: { datafeeds.0.state: started } - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed } - match: { datafeeds.1.state: stopped } - is_false: datafeeds.1.node From 2215dc88db4c7fe404e94a7699e8caa056f9bb24 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 17 Jan 2019 10:24:34 +0000 Subject: [PATCH 3/8] Improve assertion message --- .../test/java/org/elasticsearch/upgrades/MlMigrationIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java index 2cbebd1cc3a3..8ffbb7184b4b 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -542,12 +542,12 @@ private void assertJobIsMarkedAsMigrated(Map job) { private void assertDatafeedMigrated(String datafeedId, List> datafeeds) { Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) .filter(id -> id.equals(datafeedId)).findFirst(); - assertFalse(config.isPresent()); + assertFalse("datafeed [" + datafeedId + "] has not been migrated", config.isPresent()); } private void assertJobMigrated(String jobId, List> jobs) { Optional config = jobs.stream().map(map -> map.get("job_id")) .filter(id -> id.equals(jobId)).findFirst(); - assertFalse(config.isPresent()); + assertFalse("job [" + jobId + "] has not been migrated", config.isPresent()); } } From 2b8b3172b25fd36e7acaec2287dc466d8425e75b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 21 Jan 2019 15:17:23 +0000 Subject: [PATCH 4/8] Change log level for spammy message --- .../main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 97e8320f612b..e48cdc999ce8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -677,7 +677,7 @@ static Set documentsNotWritten(BulkResponse response) { logger.info("failed to index ml configuration [" + itemResponse.getFailure().getId() + "], " + itemResponse.getFailure().getMessage()); } else { - logger.info("ml configuration [" + itemResponse.getId() + "] indexed"); + logger.debug("ml configuration [" + itemResponse.getId() + "] indexed"); } } return failedDocumentIds; From ccd29f3cea71b1270ba13ecf6bc34644100910c2 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 21 Jan 2019 15:23:49 +0000 Subject: [PATCH 5/8] change tests to catch case where an open job is not migrated --- .../elasticsearch/upgrades/MlMigrationIT.java | 81 +++++++++++++++---- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java index 8ffbb7184b4b..6d4014a1d7e4 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -204,12 +204,15 @@ private void upgradedClusterTests() throws Exception { datafeedStarted = startMigratedDatafeed(OLD_CLUSTER_STOPPED_DATAFEED_ID); } - // wait for the closed and open jobs and datafeed to be migrated - waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), - Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); + // wait for the closed job and datafeed to be migrated + waitForMigration(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_STOPPED_DATAFEED_ID); - checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); - checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); + // The open job and datafeed may or may not be migrated depending on how they were allocated. + // Migration will only occur once all nodes in the cluster are v6.6.0 or higher + // open jobs will only be migrated once they become unallocated. The open job + // will only meet these conditions if it is running on the last node to be + // upgraded + waitForPossibleMigration(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); // the job and datafeed left open during upgrade should // be assigned to a node @@ -234,6 +237,11 @@ private void upgradedClusterTests() throws Exception { Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); client().performRequest(closeJob); + // if the open job wasn't migrated previously it should be now after it has been closed + waitForMigration(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); + checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); + + // and the job left open can be deleted Request deleteDatafeed = new Request("DELETE", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); client().performRequest(deleteDatafeed); Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); @@ -323,7 +331,7 @@ private void checkJobsMarkedAsMigrated(List jobIds) throws IOException { } @SuppressWarnings("unchecked") - private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception { + private void checkTaskParams(String jobId, String datafeedId, boolean expectedUpdated) throws Exception { Request getClusterState = new Request("GET", "/_cluster/state/metadata"); Response response = client().performRequest(getClusterState); Map responseMap = entityAsMap(response); @@ -336,12 +344,21 @@ private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws E assertNotNull(id); if (id.equals(MlTasks.jobTaskId(jobId))) { Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); - assertNotNull(jobParam); + if (expectedUpdated) { + assertNotNull(jobParam); + } else { + assertNull(jobParam); + } } else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) { Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task); - assertNotNull(jobIdParam); Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task); - assertNotNull(indices); + if (expectedUpdated) { + assertNotNull(jobIdParam); + assertNotNull(indices); + } else { + assertNull(jobIdParam); + assertNull(indices); + } } } } @@ -373,7 +390,7 @@ private void assertConfigInClusterState() throws IOException { } @SuppressWarnings("unchecked") - private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds) throws Exception { + private void waitForMigration(String expectedMigratedJobId, String expectedMigratedDatafeedId) throws Exception { assertBusy(() -> { // wait for the eligible configs to be moved from the clusterstate Request getClusterState = new Request("GET", "/_cluster/state/metadata"); @@ -384,23 +401,55 @@ private void waitForMigration(List expectedMigratedJobs, List ex (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); if (jobs != null) { - for (String jobId : expectedMigratedJobs) { - assertJobMigrated(jobId, jobs); - } + assertJobMigrated(expectedMigratedJobId, jobs); } List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); if (datafeeds != null) { - for (String datafeedId : expectedMigratedDatafeeds) { - assertDatafeedMigrated(datafeedId, datafeeds); - } + assertDatafeedMigrated(expectedMigratedDatafeedId, datafeeds); } }, 30, TimeUnit.SECONDS); } + @SuppressWarnings("unchecked") + private void waitForPossibleMigration(String perhapsMigratedJobId, String perhapsMigratedDatafeedId) throws Exception { + assertBusy(() -> { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> jobs = + (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); + + boolean jobMigrated = true; + if (jobs != null) { + jobMigrated = jobs.stream().map(map -> map.get("job_id")) + .noneMatch(id -> id.equals(perhapsMigratedJobId)); + } + + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); + + boolean datafeedMigrated = true; + if (datafeeds != null) { + datafeedMigrated = datafeeds.stream().map(map -> map.get("datafeed_id")) + .noneMatch(id -> id.equals(perhapsMigratedDatafeedId)); + } + + if (jobMigrated) { + // if the job is migrated the datafeed should also be + assertTrue(datafeedMigrated); + checkJobsMarkedAsMigrated(Collections.singletonList(perhapsMigratedJobId)); + } + + // if migrated the persistent task params should have been updated + checkTaskParams(perhapsMigratedJobId, perhapsMigratedDatafeedId, jobMigrated); + }); + } + @SuppressWarnings("unchecked") private void waitForJobToBeAssigned(String jobId) throws Exception { assertBusy(() -> { From 5b631526b67c39422c8a056dcbf016e6a48f2dff Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 23 Jan 2019 11:56:04 +0000 Subject: [PATCH 6/8] Fix compilation after rebase --- .../elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 08ade55a4682..4ee76a4b1ab2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -234,7 +234,7 @@ public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedExcept MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); // writing the snapshot should fail because the doc already exists // in which case the migration should continue - blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); From 32c7b26d3ea8385b683880dc62f9a7b1f711f22d Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 23 Jan 2019 15:36:36 +0000 Subject: [PATCH 7/8] Remove TODO --- .../elasticsearch/xpack/ml/action/TransportOpenJobAction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 09c1e448aada..864e6eb2647f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -828,8 +828,6 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { // If the task parameters do not have a job field then the job // was first opened on a pre v6.6 node and has not been migrated - - // TODO is eligible for migration check (all nodes are at the same version) if (params.getJob() == null) { return AWAITING_MIGRATION; } From c3e4ee73612d7871867cbda958caf8017131cf95 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 23 Jan 2019 15:43:54 +0000 Subject: [PATCH 8/8] Remove unnecessary null checks --- .../ml/action/TransportOpenJobAction.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 864e6eb2647f..4d3e6657dcab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -165,11 +165,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j int maxMachineMemoryPercent, MlMemoryTracker memoryTracker, Logger logger) { - if (job == null) { - logger.debug("[{}] select node job is null", jobId); - } - - String resultsIndexName = job != null ? job.getResultsIndexName() : null; + String resultsIndexName = job.getResultsIndexName(); List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); if (unavailableIndices.size() != 0) { @@ -231,24 +227,22 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j continue; } - if (job != null) { - if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + - "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; - logger.trace(reason); - reasons.add(reason); - continue; - } + if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " + + "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher"; + logger.trace(reason); + reasons.add(reason); + continue; + } - boolean jobConfigIsStoredInIndex = job.getJobVersion().onOrAfter(Version.V_6_6_0); - if (jobConfigIsStoredInIndex && node.getVersion().before(Version.V_6_6_0)) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node) - + "] version [" + node.getVersion() + "], because this node does not support " + - "jobs of version [" + job.getJobVersion() + "]"; - logger.trace(reason); - reasons.add(reason); - continue; - } + boolean jobConfigIsStoredInIndex = job.getJobVersion().onOrAfter(Version.V_6_6_0); + if (jobConfigIsStoredInIndex && node.getVersion().before(Version.V_6_6_0)) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node) + + "] version [" + node.getVersion() + "], because this node does not support " + + "jobs of version [" + job.getJobVersion() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; } long numberOfAssignedJobs = 0;