diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index a7e45887a6fc8..ba5a40bfbe44b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -268,7 +268,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_MACHINE_MEMORY_PERCENT = Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope); public static final Setting MAX_LAZY_ML_NODES = - Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); + Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); private static final Logger logger = LogManager.getLogger(XPackPlugin.class); @@ -308,7 +308,8 @@ public List> getSettings() { AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, - AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); + AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP, + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)); } public Settings additionalSettings() { @@ -444,7 +445,7 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(auditor, threadPool, client, clusterService), + new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService), memoryTracker ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 355ad19c8fc98..4f88dd38bb439 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -36,10 +37,10 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast private final ThreadPool threadPool; private final AtomicBoolean enabled = new AtomicBoolean(false); - MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { + MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { this.auditor = auditor; this.clusterService = clusterService; - this.mlConfigMigrator = new MlConfigMigrator(client, clusterService); + this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService); this.threadPool = threadPool; clusterService.addLocalNodeMasterListener(this); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java new file mode 100644 index 0000000000000..0f127919ac3d0 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +/** + * Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds) + * are eligible to be migrated from the cluster state into the config index + */ +public class MlConfigMigrationEligibilityCheck { + + private static final Version MIN_NODE_VERSION = Version.V_6_6_0; + + public static final Setting ENABLE_CONFIG_MIGRATION = Setting.boolSetting( + "xpack.ml.enable_config_migration", true, Setting.Property.Dynamic, Setting.Property.NodeScope); + + private volatile boolean isConfigMigrationEnabled; + + public MlConfigMigrationEligibilityCheck(Settings settings, ClusterService clusterService) { + isConfigMigrationEnabled = ENABLE_CONFIG_MIGRATION.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLE_CONFIG_MIGRATION, this::setConfigMigrationEnabled); + } + + private void setConfigMigrationEnabled(boolean configMigrationEnabled) { + this.isConfigMigrationEnabled = configMigrationEnabled; + } + + /** + * Can migration start? Returns: + * False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION} + * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * True otherwise + * @param clusterState The cluster state + * @return A boolean that dictates if config migration can start + */ + public boolean canStartMigration(ClusterState clusterState) { + if (isConfigMigrationEnabled == false) { + return false; + } + + Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); + if (minNodeVersion.before(MIN_NODE_VERSION)) { + return false; + } + return true; + } + + /** + * Is the job a eligible for migration? Returns: + * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the {@link Job#isDeleting()} + * False if the job has a persistent task + * True otherwise i.e. the job is present, not deleting + * and does not have a persistent task. + * + * @param jobId The job Id + * @param clusterState The cluster state + * @return A boolean depending on the conditions listed above + */ + public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) { + if (canStartMigration(clusterState) == false) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Job job = mlMetadata.getJobs().get(jobId); + + if (job == null || job.isDeleting()) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; + } + + /** + * Is the datafeed a eligible for migration? Returns: + * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the datafeed is not in the cluster state + * False if the datafeed has a persistent task + * True otherwise i.e. the datafeed is present and does not have a persistent task. + * + * @param datafeedId The datafeed Id + * @param clusterState The cluster state + * @return A boolean depending on the conditions listed above + */ + public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) { + if (canStartMigration(clusterState) == false) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 7ec1650c1a0ed..9b3b6a118659e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -41,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -80,18 +82,19 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; - public static final Version MIN_NODE_VERSION = Version.V_6_6_0; static final int MAX_BULK_WRITE_SIZE = 100; private final Client client; private final ClusterService clusterService; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final AtomicBoolean migrationInProgress; - public MlConfigMigrator(Client client, ClusterService clusterService) { - this.client = client; - this.clusterService = clusterService; + public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { + this.client = Objects.requireNonNull(client); + this.clusterService = Objects.requireNonNull(clusterService); + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); } @@ -114,9 +117,8 @@ public MlConfigMigrator(Client client, ClusterService clusterService) { */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - listener.onResponse(Boolean.FALSE); + if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { + listener.onResponse(false); return; } @@ -455,60 +457,4 @@ static List filterFailedDatafeedConfigWrites(Set failedDocumentI .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) .collect(Collectors.toList()); } - - /** - * Is the job a eligible for migration? Returns: - * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} - * False if the job is not in the cluster state - * False if the {@link Job#isDeleting()} - * False if the job has a persistent task - * True otherwise i.e. the job is present, not deleting - * and does not have a persistent task. - * - * @param jobId The job Id - * @param clusterState clusterstate - * @return A boolean depending on the conditions listed above - */ - public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - return false; - } - - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Job job = mlMetadata.getJobs().get(jobId); - - if (job == null || job.isDeleting()) { - return false; - } - - PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; - } - - /** - * Is the datafeed a eligible for migration? Returns: - * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} - * False if the datafeed is not in the cluster state - * False if the datafeed has a persistent task - * True otherwise i.e. the datafeed is present and does not have a persistent task. - * - * @param datafeedId The datafeed Id - * @param clusterState clusterstate - * @return A boolean depending on the conditions listed above - */ - public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - return false; - } - - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) { - return false; - } - - PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 58e79c1ab11dc..13ee04e01f4db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -20,9 +20,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; @@ -33,7 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -45,6 +45,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction listener) { - if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) { + if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 5dc9d04d6853b..7751480db3513 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -64,7 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; @@ -97,6 +97,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @@ -149,7 +151,7 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state, ActionListener listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 1709992d551dc..c1386c8aee047 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -70,7 +70,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -113,6 +113,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 1522c04d8f977..b7776a1b7f150 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -24,12 +24,12 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.MlConfigMigrator; -import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.Date; @@ -42,6 +42,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio private final JobManager jobManager; private final JobResultsProvider jobResultsProvider; private final JobDataCountsPersister jobDataCountsPersister; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; @Inject public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -54,6 +55,7 @@ public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPo this.jobManager = jobManager; this.jobResultsProvider = jobResultsProvider; this.jobDataCountsPersister = jobDataCountsPersister; + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); } @Override @@ -69,7 +71,7 @@ protected RevertModelSnapshotAction.Response newResponse() { @Override protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state, ActionListener listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 55270691661bd..c53a0a1ac079f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -45,7 +45,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; @@ -78,6 +78,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction listener) throws Exception { - if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) { + if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 38cdbffc299f2..bafbe7d6847aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -51,7 +51,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -103,6 +103,7 @@ public class JobManager { private final ThreadPool threadPool; private final UpdateJobProcessNotifier updateJobProcessNotifier; private final JobConfigProvider jobConfigProvider; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private volatile ByteSizeValue maxModelMemoryLimit; @@ -128,6 +129,7 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider this.threadPool = Objects.requireNonNull(threadPool); this.updateJobProcessNotifier = updateJobProcessNotifier; this.jobConfigProvider = jobConfigProvider; + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() @@ -438,7 +440,7 @@ public void onFailure(Exception e) { public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { ClusterState clusterState = clusterService.state(); - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), clusterState)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), clusterState)) { actionListener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update job", request.getJobId())); return; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java new file mode 100644 index 0000000000000..fec071c464104 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -0,0 +1,319 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlConfigMigrationEligibilityCheckTests extends ESTestCase { + + private ClusterService clusterService; + + @Before + public void setUpTests() { + clusterService = mock(ClusterService.class); + } + + public void testCanStartMigration_givenMigrationIsDisabled() { + Settings settings = newSettings(false); + givenClusterSettings(settings); + ClusterState clusterState = mock(ClusterState.class); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.canStartMigration(clusterState)); + } + + public void testCanStartMigration_givenNodesNotUpToVersion() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.canStartMigration(clusterState)); + } + + public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.canStartMigration(clusterState)); + } + + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.jobIsEligibleForMigration("pre-min-version", clusterState)); + } + + public void testJobIsEligibleForMigration_givenJobNotInClusterState() { + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.jobIsEligibleForMigration("not-in-state", clusterState)); + } + + public void testJobIsEligibleForMigration_givenDeletingJob() { + Job deletingJob = JobTests.buildJobBuilder("deleting-job").setDeleting(true).build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(deletingJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(deletingJob.getId()), + MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(deletingJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.jobIsEligibleForMigration(deletingJob.getId(), clusterState)); + } + + public void testJobIsEligibleForMigration_givenOpenJob() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.jobIsEligibleForMigration(openJob.getId(), clusterState)); + } + + public void testJobIsEligibleForMigration_givenOpenJobAndAndMigrationIsDisabled() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + Settings settings = newSettings(false); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.jobIsEligibleForMigration(openJob.getId(), clusterState)); + } + + public void testJobIsEligibleForMigration_givenClosedJob() { + Job closedJob = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + ) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.jobIsEligibleForMigration(closedJob.getId(), clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.datafeedIsEligibleForMigration("pre-min-version", clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenDatafeedNotInClusterState() { + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.datafeedIsEligibleForMigration("not-in-state", clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenStartedDatafeed() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(openJob.getId()), Collections.emptyMap()); + String datafeedId = "df-" + openJob.getId(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedId, 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenStartedDatafeedAndMigrationIsDisabled() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(openJob.getId()), Collections.emptyMap()); + String datafeedId = "df-" + openJob.getId(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedId, 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + Settings settings = newSettings(false); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertFalse(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { + Job job = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); + String datafeedId = "df-" + job.getId(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + ) + .build(); + + Settings settings = newSettings(true); + givenClusterSettings(settings); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + + assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + + private void givenClusterSettings(Settings settings) { + ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + + private static Settings newSettings(boolean migrationEnabled) { + return Settings.builder() + .put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), migrationEnabled) + .build(); + } + + private DatafeedConfig createCompatibleDatafeed(String jobId) { + // create a datafeed without aggregations or anything + // else that may cause validation errors + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("df-" + jobId, jobId); + datafeedBuilder.setIndices(Collections.singletonList("my_index")); + return datafeedBuilder.build(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index 92d06724a0488..d9ea035e58234 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -11,9 +11,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -24,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -218,124 +214,6 @@ public void testRemoveJobsAndDatafeeds_removeSome() { assertThat(removalResult.removedDatafeedIds, empty()); } - public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { - // mixed 6.5 and 6.6 nodes - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration("pre-min-version", clusterState)); - } - - public void testJobIsEligibleForMigration_givenJobNotInClusterState() { - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); - assertFalse(MlConfigMigrator.jobIsEligibleForMigration("not-in-state", clusterState)); - } - - public void testJobIsEligibleForMigration_givenDeletingJob() { - Job deletingJob = JobTests.buildJobBuilder("deleting-job").setDeleting(true).build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(deletingJob, false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId(deletingJob.getId()), - MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(deletingJob.getId()), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration(deletingJob.getId(), clusterState)); - } - - public void testJobIsEligibleForMigration_givenOpenJob() { - Job openJob = JobTests.buildJobBuilder("open-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration(openJob.getId(), clusterState)); - } - - public void testJobIsEligibleForMigration_givenClosedJob() { - Job closedJob = JobTests.buildJobBuilder("closed-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); - - assertTrue(MlConfigMigrator.jobIsEligibleForMigration(closedJob.getId(), clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { - // mixed 6.5 and 6.6 nodes - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); - - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("pre-min-version", clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenDatafeedNotInClusterState() { - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("not-in-state", clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenStartedDatafeed() { - Job openJob = JobTests.buildJobBuilder("open-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); - mlMetadata.putDatafeed(createCompatibleDatafeed(openJob.getId()), Collections.emptyMap()); - String datafeedId = "df-" + openJob.getId(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, - new StartDatafeedAction.DatafeedParams(datafeedId, 0L), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { - Job job = JobTests.buildJobBuilder("closed-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); - mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); - String datafeedId = "df-" + job.getId(); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); - - assertTrue(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); - } - public void testLimitWrites_GivenBelowLimit() { MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(Collections.emptyList(), Collections.emptyMap()); assertThat(jobsAndDatafeeds.datafeedConfigs, empty()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 51a3b5d2366b0..b81805fb3fbdf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -11,16 +11,21 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -28,13 +33,25 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MlConfigMigratorIT extends MlSingleNodeTestCase { + private ClusterService clusterService; + + @Before + public void setUpTests() { + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(nodeSettings(), new HashSet<>(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + public void testWriteConfigToIndex() throws InterruptedException { final String indexJobId = "job-already-migrated"; @@ -50,8 +67,7 @@ public void testWriteConfigToIndex() throws InterruptedException { // put a job representing a previously migrated job blockingCall(actionListener -> jobConfigProvider.putJob(migratedJob, actionListener), indexResponseHolder, exceptionHolder); - ClusterService clusterService = mock(ClusterService.class); - MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); AtomicReference> failedIdsHolder = new AtomicReference<>(); Job foo = buildJobBuilder("foo").build(); @@ -97,7 +113,6 @@ public void testMigrateConfigs() throws InterruptedException { .putCustom(MlMetadata.TYPE, mlMetadata.build())) .build(); - ClusterService clusterService = mock(ClusterService.class); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); @@ -108,7 +123,7 @@ public void testMigrateConfigs() throws InterruptedException { AtomicReference responseHolder = new AtomicReference<>(); // do the migration - MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), responseHolder, exceptionHolder); @@ -138,6 +153,56 @@ public void testMigrateConfigs() throws InterruptedException { assertThat(datafeedsHolder.get(), hasSize(1)); assertEquals("df-1", datafeedsHolder.get().get(0).getId()); } + + public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException { + Settings settings = Settings.builder().put(nodeSettings()) + .put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + mlMetadata.putJob(buildJobBuilder("job-bar").build(), false); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo"); + builder.setIndices(Collections.singletonList("beats*")); + mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertFalse(responseHolder.get()); + + // check the jobs have not been migrated + AtomicReference> jobsHolder = new AtomicReference<>(); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), + jobsHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get().isEmpty(), is(true)); + + // check datafeeds have not been migrated + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); + AtomicReference> datafeedsHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), + datafeedsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(datafeedsHolder.get().isEmpty(), is(true)); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 73a048d3fe3c0..871affade508d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -64,6 +65,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -106,10 +108,13 @@ public class JobManagerTests extends ESTestCase { @Before public void setup() throws Exception { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .build(); environment = TestEnvironment.newEnvironment(settings); analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); clusterService = mock(ClusterService.class); + givenClusterSettings(settings); jobResultsProvider = mock(JobResultsProvider.class); auditor = mock(Auditor.class); @@ -541,10 +546,6 @@ public void testJobExists_GivenMissingJob() { JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; listener.onFailure(ExceptionsHelper.missingJobException("non-job")); @@ -578,11 +579,6 @@ public void testJobExists_GivenJobIsInClusterState() { JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - - JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); @@ -602,10 +598,6 @@ public void testJobExists_GivenJobIsInIndex() { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); when(clusterService.state()).thenReturn(clusterState); - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -962,10 +954,6 @@ public void testRevertSnapshot_GivenJobInClusterState() { .build(); when(clusterService.state()).thenReturn(clusterState); - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; @@ -1007,9 +995,6 @@ private Job.Builder createJobFoo() { } private JobManager createJobManager(Client client) { - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); return new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, client, updateJobProcessNotifier); } @@ -1026,4 +1011,11 @@ private BytesReference toBytesReference(ToXContent content) throws IOException { return BytesReference.bytes(xContentBuilder); } } + + private void givenClusterSettings(Settings settings) { + ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList( + MachineLearningField.MAX_MODEL_MEMORY_LIMIT, + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } }