diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index b709e32946ec6..6af323f1510e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; @@ -467,6 +468,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis } } + public static MlMetadata getMlMetadata(ClusterState state) { + MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE); + if (mlMetadata == null) { + return EMPTY_METADATA; + } + return mlMetadata; + } + public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 1ac842e8898bf..4e51d7b6c1e30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.ml.job.persistence; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; /** @@ -47,8 +46,7 @@ public static String resultsWriteAlias(String jobId) { * @return The index name */ public static String getPhysicalIndexFromState(ClusterState state, String jobId) { - MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE); - return meta.getJobs().get(jobId).getResultsIndexName(); + return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName(); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index f533fc1aa5a7c..05af1ffee17a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; @@ -132,15 +131,7 @@ public Map nativeCodeInfo() { @Override public void usage(ActionListener listener) { ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE); - - // Handle case when usage is called but MlMetadata has not been installed yet - if (mlMetadata == null) { - listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled, - Collections.emptyMap(), Collections.emptyMap())); - } else { - new Retriever(client, mlMetadata, available(), enabled()).execute(listener); - } + new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener); } public static class Retriever { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 87cf03caa9949..37d714d1777da 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -90,8 +89,7 @@ public void clusterChanged(ClusterChangedEvent event) { } } else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) { String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); - MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE); - DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId); + DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); if (currentAssignment.getExecutorNode() == null) { String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" + currentAssignment.getExplanation() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 5dba8ce943c44..c96a12ffa1047 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -7,20 +7,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlMetadata; - -import java.util.concurrent.atomic.AtomicBoolean; class MlInitializationService extends AbstractComponent implements ClusterStateListener { @@ -28,8 +21,6 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL private final ClusterService clusterService; private final Client client; - private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); - private volatile MlDailyMaintenanceService mlDailyMaintenanceService; MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) { @@ -48,45 +39,12 @@ public void clusterChanged(ClusterChangedEvent event) { } if (event.localNodeMaster()) { - MetaData metaData = event.state().metaData(); - installMlMetadata(metaData); installDailyMaintenanceService(); } else { uninstallDailyMaintenanceService(); } } - private void installMlMetadata(MetaData metaData) { - if (metaData.custom(MLMetadataField.TYPE) == null) { - if (installMlMetadataCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> - clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // If the metadata has been added already don't try to update - if (currentState.metaData().custom(MLMetadataField.TYPE) != null) { - return currentState; - } - ClusterState.Builder builder = new ClusterState.Builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA); - builder.metaData(metadataBuilder.build()); - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - installMlMetadataCheck.set(false); - logger.error("unable to install ml metadata", e); - } - }) - ); - } - } else { - installMlMetadataCheck.set(false); - } - } - private void installDailyMaintenanceService() { if (mlDailyMaintenanceService == null) { mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 7d113a838dd45..fa649e541963d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; @@ -92,8 +91,7 @@ public TransportCloseJobAction(Settings settings, TransportService transportServ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List openJobIds, List closingJobIds) { PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE); - final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull; + final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); List failedJobs = new ArrayList<>(); @@ -107,7 +105,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState }; Set expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()); - expandedJobIds.stream().forEach(jobIdProcessor::accept); + expandedJobIds.forEach(jobIdProcessor::accept); if (request.isForce() == false && failedJobs.size() > 0) { if (expandedJobIds.size() == 1) { throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 79995af9e62aa..220d97e89ba14 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -119,8 +119,8 @@ protected DeleteDatafeedAction.Response newResponse(boolean acknowledged) { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); PersistentTasksCustomMetaData persistentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java index 19e6ec595042a..e14cd76aa183a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -60,8 +59,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener jobs = currentMlMetadata.getJobs(); + Map jobs = MlMetadata.getMlMetadata(state).getJobs(); List currentlyUsedBy = new ArrayList<>(); for (Job job : jobs.values()) { List detectors = job.getAnalysisConfig().getDetectors(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index b664487c77df6..90821b302fc5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -200,10 +200,9 @@ public void onFailure(Exception e) { void markJobAsDeleting(String jobId, ActionListener listener, boolean force) { clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); builder.markJobAsDeleted(jobId, tasks, force); return buildNewClusterState(currentState, builder); } @@ -248,11 +247,7 @@ public void onTimeout(TimeValue timeout) { } static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) { - MlMetadata metadata = clusterState.metaData().custom(MLMetadataField.TYPE); - if (metadata == null) { - return true; - } - return !metadata.getJobs().containsKey(jobId); + return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId); } private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 3f8ca39a0f124..e939c6ef31a2f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -56,8 +56,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust logger.debug("finalizing jobs [{}]", jobIdString); clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); Date finishedTime = new Date(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java index d59febf12cd36..c81bb2642236d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction; @@ -70,7 +69,7 @@ protected void doExecute(GetCalendarEventsAction.Request request, if (request.getJobId() != null) { ClusterState state = clusterService.state(); - MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE); + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); List jobGroups; String requestId = request.getJobId(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index c94449e8c3616..b4e3851eda820 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -52,10 +51,7 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState ActionListener listener) throws Exception { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); List datafeedConfigs = new ArrayList<>(); for (String expandedDatafeedId : expandedDatafeedIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 0d5b2b202098b..41c8379c39fb8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -56,11 +55,7 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS ActionListener listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } - + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index f8f8fffa5b710..78bfe2c7bc6b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -69,8 +68,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport @Override protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener listener) { - MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MLMetadataField.TYPE); - MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata; + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()))); ActionListener finalListener = listener; listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 5ae84129254e5..5e829c72756a7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -163,7 +162,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j continue; } - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); Job job = mlMetadata.getJobs().get(jobId); Set compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); if (compatibleJobTypes.contains(job.getJobType()) == false) { @@ -474,8 +473,7 @@ public void onFailure(Exception e) { // Step 3. Update established model memory for pre-6.1 jobs that haven't had it set ActionListener missingMappingsListener = ActionListener.wrap( response -> { - MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE); - Job job = mlMetadata.getJobs().get(jobParams.getJobId()); + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobParams.getJobId()); if (job != null) { Version jobVersion = job.getJobVersion(); Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); @@ -650,8 +648,7 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP public void validate(OpenJobAction.JobParams params, ClusterState clusterState) { // If we already know that we can't find an ml node because all ml nodes are running at capacity or // simply because there are no ml nodes in the cluster then we fail quickly here: - MlMetadata mlMetadata = clusterState.metaData().custom(MLMetadataField.TYPE); - TransportOpenJobAction.validate(params.getJobId(), mlMetadata); + TransportOpenJobAction.validate(params.getJobId(), MlMetadata.getMlMetadata(clusterState)); PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger); if (assignment.getExecutorNode() == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 2ffb318dc4fb2..9cba0b20c51b9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -17,7 +17,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; @@ -52,7 +51,7 @@ public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, @Override protected void doExecute(PreviewDatafeedAction.Request request, ActionListener listener) { - MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java index 11e8fbf912c5b..1393d663fb251 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java @@ -14,9 +14,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; @@ -26,16 +24,12 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -43,17 +37,15 @@ public class TransportPutCalendarAction extends HandledTransportAction { private final Client client; - private final ClusterService clusterService; @Inject public TransportPutCalendarAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, ClusterService clusterService) { + Client client) { super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutCalendarAction.Request::new); this.client = client; - this.clusterService = clusterService; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 0c492a0817c98..2b4304a205b13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -141,7 +141,7 @@ public ClusterState execute(ClusterState currentState) { } private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) { - MlMetadata currentMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) .putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build(); return ClusterState.builder(clusterState).metaData( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index aba52b38ecd27..71afa656a4a69 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -130,7 +130,7 @@ public void onFailure(Exception e) { }; // Verify data extractor factory can be created, then start persistent task - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); validate(params.getDatafeedId(), mlMetadata, tasks); DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); @@ -221,9 +221,8 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedActio @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - MlMetadata mlMetadata = clusterState.metaData().custom(MLMetadataField.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - TransportStartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks); + TransportStartDatafeedAction.validate(params.getDatafeedId(), MlMetadata.getMlMetadata(clusterState), tasks); new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 5ed923e3b2370..b5f16ff191fe2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -130,7 +130,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new)); } } else { - MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); List startedDatafeeds = new ArrayList<>(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java index 5df7e890a8fa4..0524cb28a0c11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; @@ -48,8 +47,7 @@ public TransportUpdateCalendarJobAction(Settings settings, ThreadPool threadPool @Override protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener listener) { ClusterState clusterState = clusterService.state(); - MlMetadata maybeNullMetaData = clusterState.getMetaData().custom(MLMetadataField.TYPE); - final MlMetadata mlMetadata = maybeNullMetaData == null ? MlMetadata.EMPTY_METADATA : maybeNullMetaData; + final MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); Set jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression()); Set jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index e50adfa8275e2..4d752fe294081 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -63,9 +63,9 @@ protected PutDatafeedAction.Response newResponse(boolean acknowledged) { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { DatafeedUpdate update = request.getUpdate(); - MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); PersistentTasksCustomMetaData persistentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 5daf8ce28964e..e27156b512613 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -80,10 +79,7 @@ public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clus public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer taskHandler) { String datafeedId = task.getDatafeedId(); ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index e52906a605d0d..37f9715d09464 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -33,7 +32,7 @@ public class DatafeedNodeSelector { private final IndexNameExpressionResolver resolver; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) { - MlMetadata mlMetadata = Objects.requireNonNull(clusterState.metaData().custom(MLMetadataField.TYPE)); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeed = mlMetadata.getDatafeed(datafeedId); this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0f6a0f44cbf02..2d67e64ec60e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -133,8 +133,7 @@ public Job getJobOrThrowIfUnknown(String jobId) { * @throws ResourceNotFoundException if no job matches {@code jobId} */ public static Job getJobOrThrowIfUnknown(String jobId, ClusterState clusterState) { - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId); + Job job = MlMetadata.getMlMetadata(clusterState).getJobs().get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } @@ -142,11 +141,7 @@ public static Job getJobOrThrowIfUnknown(String jobId, ClusterState clusterState } private Set expandJobIds(String expression, boolean allowNoJobs, ClusterState clusterState) { - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } - return mlMetadata.expandJobIds(expression, allowNoJobs); + return MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); } /** @@ -160,7 +155,7 @@ private Set expandJobIds(String expression, boolean allowNoJobs, Cluster */ public QueryPage expandJobs(String expression, boolean allowNoJobs, ClusterState clusterState) { Set expandedJobIds = expandJobIds(expression, allowNoJobs, clusterState); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); List jobs = new ArrayList<>(); for (String expandedJobId : expandedJobIds) { jobs.add(mlMetadata.getJobs().get(expandedJobId)); @@ -188,8 +183,8 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist DEPRECATION_LOGGER.deprecated("Creating jobs with delimited data format is deprecated. Please use xcontent instead."); } - MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (currentMlMetadata != null && currentMlMetadata.getJobs().containsKey(job.getId())) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); + if (currentMlMetadata.getJobs().containsKey(job.getId())) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); return; } @@ -469,8 +464,8 @@ protected Boolean newResponse(boolean acknowledged) { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); if (currentMlMetadata.getJobs().containsKey(jobId) == false) { // We wouldn't have got here if the job never existed so // the Job must have been deleted by another action. @@ -560,8 +555,7 @@ public ClusterState execute(ClusterState currentState) { } private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); - return new MlMetadata.Builder(currentMlMetadata); + return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); } private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index aa003d29559f0..8364e015a3456 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -61,12 +60,8 @@ private void removeData(Iterator jobIterator, ActionListener liste } private Iterator newJobIterator() { - List jobs = new ArrayList<>(); ClusterState clusterState = clusterService.state(); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata != null) { - jobs.addAll(mlMetadata.getJobs().values()); - } + List jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values()); return createVolatileCursorIterator(jobs); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index b07b025e09e56..fd4085d202041 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -11,10 +11,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -24,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; import java.util.Arrays; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Objects; @@ -84,12 +81,7 @@ private BulkRequestBuilder findUnusedStateDocs() { } private Set getJobIds() { - ClusterState clusterState = clusterService.state(); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata != null) { - return mlMetadata.getJobs().keySet(); - } - return Collections.emptySet(); + return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet(); } private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index b685e6b961d3c..eba2054054c0d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -65,7 +65,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase { private XPackLicenseState licenseState; @Before - public void init() throws Exception { + public void init() { commonSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) @@ -232,9 +232,28 @@ public void testUsageGivenMlMetadataNotInstalled() throws Exception { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { usage.toXContent(builder, ToXContent.EMPTY_PARAMS); source = new XContentSource(builder); - assertThat(source.getValue("jobs"), equalTo(Collections.emptyMap())); - assertThat(source.getValue("datafeeds"), equalTo(Collections.emptyMap())); } + + assertThat(source.getValue("jobs._all.count"), equalTo(0)); + assertThat(source.getValue("jobs._all.detectors.min"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.max"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.total"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.avg"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.min"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.max"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.total"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.avg"), equalTo(0.0)); + + assertThat(source.getValue("jobs.opening"), is(nullValue())); + assertThat(source.getValue("jobs.opened"), is(nullValue())); + assertThat(source.getValue("jobs.closing"), is(nullValue())); + assertThat(source.getValue("jobs.closed"), is(nullValue())); + assertThat(source.getValue("jobs.failed"), is(nullValue())); + + assertThat(source.getValue("datafeeds._all.count"), equalTo(0)); + + assertThat(source.getValue("datafeeds.started"), is(nullValue())); + assertThat(source.getValue("datafeeds.stopped"), is(nullValue())); } private void givenJobs(List jobs, List jobsStats) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index ce46139a18bdb..d324d17f40e25 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -22,20 +21,15 @@ import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.junit.Before; -import org.mockito.Mockito; import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.mock.orig.Mockito.doAnswer; -import static org.elasticsearch.mock.orig.Mockito.times; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -68,7 +62,7 @@ public void setUpMocks() { when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); } - public void testInitialize() throws Exception { + public void testInitialize() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -80,11 +74,10 @@ public void testInitialize() throws Exception { .build(); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); } - public void testInitialize_noMasterNode() throws Exception { + public void testInitialize_noMasterNode() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -94,11 +87,10 @@ public void testInitialize_noMasterNode() throws Exception { .build(); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); } - public void testInitialize_alreadyInitialized() throws Exception { + public void testInitialize_alreadyInitialized() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -113,67 +105,10 @@ public void testInitialize_alreadyInitialized() throws Exception { initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService()); } - public void testInitialize_onlyOnce() throws Exception { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); - } - - public void testInitialize_reintialiseAfterFailure() throws Exception { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); - - // Fail the first cluster state update - AtomicBoolean onFailureCalled = new AtomicBoolean(false); - Mockito.doAnswer(invocation -> { - ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; - task.onFailure("mock a failure", new IllegalStateException()); - onFailureCalled.set(true); - return null; - }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get()); - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - // 2nd update succeeds - AtomicReference clusterStateHolder = new AtomicReference<>(); - Mockito.doAnswer(invocation -> { - ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; - clusterStateHolder.set(task.execute(cs)); - return null; - }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null); - verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - // 3rd update won't be called as ML Metadata has been installed - initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get())); - verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - } - - public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { + public void testNodeGoesFromMasterToNonMasterAndBack() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index a5d3faf323434..bd722ebf8ef9a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -251,11 +251,11 @@ public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception } public void testDatafeedTaskWaitsUntilJobIsOpened() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -269,8 +269,8 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); @@ -280,8 +280,8 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); @@ -294,8 +294,8 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -308,8 +308,8 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build())); @@ -322,8 +322,8 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -340,8 +340,8 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index 778ffe6dfae2d..357c2bc232552 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -103,7 +103,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { - MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); assertNotNull(mlMetadata); MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata); @@ -116,7 +116,7 @@ private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { } private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MLMetadataField.TYPE)); + MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); ClusterState.Builder newState = ClusterState.builder(currentState); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 667be5d41ea2b..64b3bfbab45c8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -326,7 +326,7 @@ private JobManager createJobManager() { private ClusterState createClusterState() { ClusterState.Builder builder = ClusterState.builder(new ClusterName("_name")); - builder.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA)); + builder.metaData(MetaData.builder()); return builder.build(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index 9fea904a99fa1..ef87fe392dd75 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -39,8 +39,6 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -93,7 +91,7 @@ public void testCreateJobResultsIndex() { AtomicReference resultHolder = new AtomicReference<>(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())) + .metaData(MetaData.builder().indices(ImmutableOpenMap.of())) .build(); ClusterService clusterService = mock(ClusterService.class); @@ -157,7 +155,7 @@ public void testCreateJobWithExistingIndex() { .fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), indexMetaData).build(); ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + .metaData(MetaData.builder().indices(indexMap)).build(); ClusterService clusterService = mock(ClusterService.class); @@ -209,7 +207,7 @@ public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() { ImmutableOpenMap indexMap = ImmutableOpenMap.builder().build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + .metaData(MetaData.builder().indices(indexMap)).build(); ClusterService clusterService = mock(ClusterService.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 9f3d1c779f8eb..f330c501b76f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -28,7 +28,6 @@ import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -272,8 +271,7 @@ public static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(St } public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception { - MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); - MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState()); try { logger.info("Closing all datafeeds (using _all)"); StopDatafeedAction.Response stopResponse = client @@ -312,8 +310,7 @@ public static void deleteAllDatafeeds(Logger logger, Client client) throws Excep } public static void deleteAllJobs(Logger logger, Client client) throws Exception { - MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); - MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState()); try { CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);