diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index fd7fa70bded43..5a613c96bf6ab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -445,7 +445,13 @@ public long estimateMemoryFootprint() { if (establishedModelMemory != null && establishedModelMemory > 0) { return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes(); } - return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes(); + // Pre v6.1 jobs may have a null analysis limits object or + // a null model memory limit + long modelMemoryLimit = AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB; + if (analysisLimits != null && analysisLimits.getModelMemoryLimit() != null) { + modelMemoryLimit = analysisLimits.getModelMemoryLimit(); + } + return ByteSizeUnit.MB.toBytes(modelMemoryLimit) + PROCESS_MEMORY_OVERHEAD.getBytes(); } /** diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java index 28e96d0974ea5..f5bcd7d8d53db 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java @@ -20,12 +20,26 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; public final class XPackRestTestHelper { + public static final List ML_PRE_V660_TEMPLATES = Collections.unmodifiableList( + Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, + MlMetaIndex.INDEX_NAME, + AnomalyDetectorsIndex.jobStateIndexName(), + AnomalyDetectorsIndex.jobResultsIndexPrefix())); + + public static final List ML_POST_V660_TEMPLATES = Collections.unmodifiableList( + Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, + MlMetaIndex.INDEX_NAME, + AnomalyDetectorsIndex.jobStateIndexName(), + AnomalyDetectorsIndex.jobResultsIndexPrefix(), + AnomalyDetectorsIndex.configIndexName())); + private XPackRestTestHelper() { } @@ -33,7 +47,17 @@ private XPackRestTestHelper() { * Waits for the Machine Learning templates to be created * and check the version is up to date */ - public static void waitForMlTemplates(RestClient client) throws InterruptedException { + + + /** + * For each template name wait for the template to be created and + * for the template version to be equal to the master node version. + * + * @param client The rest client + * @param templateNames Names of the templates to wait for + * @throws InterruptedException If the wait is interrupted + */ + public static void waitForTemplates(RestClient client, List templateNames) throws InterruptedException { AtomicReference masterNodeVersion = new AtomicReference<>(); ESTestCase.awaitBusy(() -> { String response; @@ -53,8 +77,6 @@ public static void waitForMlTemplates(RestClient client) throws InterruptedExcep return false; }); - final List templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix()); for (String template : templateNames) { ESTestCase.awaitBusy(() -> { Map response; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index af1d88c0f52a4..cd25fdd060588 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -724,7 +724,8 @@ public UnaryOperator> getIndexTemplateMetaDat public static boolean allTemplatesInstalled(ClusterState clusterState) { boolean allPresent = true; List templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix()); + AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix(), + AnomalyDetectorsIndex.configIndexName()); for (String templateName : templateNames) { allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 4850ad71dc2ef..60e1d58a2d3ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -29,6 +30,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -48,10 +50,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + public class TransportCloseJobAction extends TransportTasksAction { private final ClusterService clusterService; + private final Client client; private final Auditor auditor; private final PersistentTasksService persistentTasksService; private final DatafeedConfigProvider datafeedConfigProvider; @@ -61,11 +67,12 @@ public class TransportCloseJobAction extends TransportTasksAction() { @Override public void onResponse(Boolean result) { - listener.onResponse(response); + FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( + waitForCloseRequest.jobsToFinalize.toArray(new String[0])); + executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, + ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 6a9dcd608208f..2512462610ec4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -7,8 +7,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -24,23 +28,36 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Date; -import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; -// This action is only called from modes before version 6.6.0 -public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +public class TransportFinalizeJobExecutionAction extends + TransportMasterNodeAction { + + private final Client client; @Inject public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + Client client) { super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); + this.client = client; } @Override @@ -58,20 +75,60 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust ActionListener listener) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - List jobsInClusterState = Arrays.stream(request.getJobIds()) + Set jobsInClusterState = Arrays.stream(request.getJobIds()) .filter(id -> mlMetadata.getJobs().containsKey(id)) - .collect(Collectors.toList()); - - // This action should not be called for jobs that have - // their configuration in index documents + .collect(Collectors.toSet()); if (jobsInClusterState.isEmpty()) { - // This action is a no-op for jobs not defined in the cluster state. - listener.onResponse(new AcknowledgedResponse(true)); - return; + finalizeIndexJobs(Arrays.asList(request.getJobIds()), listener); + } else { + ActionListener finalizeClusterStateJobsListener = ActionListener.wrap( + ack -> finalizeClusterStateJobs(jobsInClusterState, listener), + listener::onFailure + ); + + Set jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds())); + jobsInIndex.removeAll(jobsInClusterState); + + finalizeIndexJobs(jobsInIndex, finalizeClusterStateJobsListener); + } + } + + private void finalizeIndexJobs(Collection jobIds, ActionListener listener) { + String jobIdString = String.join(",", jobIds); + logger.debug("finalizing jobs [{}]", jobIdString); + + ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( + MachineLearning.UTILITY_THREAD_POOL_NAME), true); + + Map update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()); + + for (String jobId: jobIds) { + UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + updateRequest.retryOnConflict(3); + updateRequest.doc(update); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + chainTaskExecutor.add(chainedListener -> { + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap( + updateResponse -> chainedListener.onResponse(null), + chainedListener::onFailure + )); + }); } - String jobIdString = String.join(",", jobsInClusterState); + chainTaskExecutor.execute(ActionListener.wrap( + aVoid -> { + logger.debug("finalized job [{}]", jobIdString); + listener.onResponse(new AcknowledgedResponse(true)); + }, + listener::onFailure + )); + } + + private void finalizeClusterStateJobs(Collection jobIds, ActionListener listener) { + String jobIdString = String.join(",", jobIds); String source = "finalize_job_execution [" + jobIdString + "]"; logger.debug("finalizing jobs [{}]", jobIdString); clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @@ -82,7 +139,7 @@ public ClusterState execute(ClusterState currentState) { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); Date finishedTime = new Date(); - for (String jobId : jobsInClusterState) { + for (String jobId : jobIds) { Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); jobBuilder.setFinishedTime(finishedTime); mlMetadataBuilder.putJob(jobBuilder.build(), true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index e0ae481d61b53..0cc5fc068a81c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -73,7 +74,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport @Override protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener finalListener) { - + logger.debug("Get stats for job [{}]", request.getJobId()); jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedIds -> { request.setExpandedJobsIds(new ArrayList<>(expandedIds)); @@ -96,6 +97,7 @@ protected GetJobsStatsAction.Response newResponse(GetJobsStatsAction.Request req for (QueryPage task : tasks) { stats.addAll(task.results()); } + Collections.sort(stats, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId)); return new GetJobsStatsAction.Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD)); } @@ -109,7 +111,6 @@ protected QueryPage readTaskResponse(Strea protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task, ActionListener> listener) { String jobId = task.getJobId(); - logger.debug("Get stats for job [{}]", jobId); ClusterState state = clusterService.state(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); Optional> stats = processManager.getStatistics(task); @@ -159,6 +160,7 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc if (counter.decrementAndGet() == 0) { List results = response.getResponse().results(); results.addAll(jobStats.asList()); + Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId)); listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 1d7623e69e4de..c9315a178148d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -158,6 +158,10 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j int maxMachineMemoryPercent, MlMemoryTracker memoryTracker, Logger logger) { + if (job == null) { + logger.debug("[{}] select node job is null", jobId); + } + String resultsIndexName = job != null ? job.getResultsIndexName() : null; List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); if (unavailableIndices.size() != 0) { @@ -236,6 +240,16 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j reasons.add(reason); continue; } + + boolean jobConfigIsStoredInIndex = job.getJobVersion().onOrAfter(Version.V_6_6_0); + if (jobConfigIsStoredInIndex && node.getVersion().before(Version.V_6_6_0)) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node) + + "] version [" + node.getVersion() + "], because this node does not support " + + "jobs of version [" + job.getJobVersion() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } } long numberOfAssignedJobs = 0; @@ -820,8 +834,16 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS @Override public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) { + Job foundJob = params.getJob(); + if (foundJob == null) { + // The job was added to the persistent task parameters in 6.6.0 + // if the field is not present the task was created before 6.6.0. + // In which case the job should still be in the clusterstate + foundJob = MlMetadata.getMlMetadata(clusterState).getJobs().get(params.getJobId()); + } + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), - params.getJob(), + foundJob, clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index c8bb9831abc0d..9987357f7bf11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -49,7 +49,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import java.util.List; import java.util.Locale; @@ -71,7 +71,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction jobListener = ActionListener.wrap( - jobBuilder -> { + ActionListener jobListener = ActionListener.wrap( + job -> { try { - Job job = jobBuilder.build(); validate(job, datafeedConfigHolder.get(), tasks); createDataExtrator.accept(job); } catch (Exception e) { @@ -186,7 +185,7 @@ public void onFailure(Exception e) { params.setDatafeedIndices(datafeedConfig.getIndices()); params.setJobId(datafeedConfig.getJobId()); datafeedConfigHolder.set(datafeedConfig); - jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); + jobManager.getJob(datafeedConfig.getJobId(), jobListener); } catch (Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 6c58f411e02d1..160ef09ec823e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -8,22 +8,22 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; -import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; -import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -52,21 +52,21 @@ public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistr this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - void build(String datafeedId, ActionListener listener) { + void build(String datafeedId, ClusterState state, ActionListener listener) { JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); JobConfigProvider jobConfigProvider = new JobConfigProvider(client); - DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry); + DatafeedConfigReader datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry); - build(datafeedId, jobResultsProvider, jobConfigProvider, datafeedConfigProvider, listener); + build(datafeedId, jobResultsProvider, jobConfigProvider, datafeedConfigReader, state, listener); } /** * For testing only. - * Use {@link #build(String, ActionListener)} instead + * Use {@link #build(String, ClusterState, ActionListener)} instead */ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, - DatafeedConfigProvider datafeedConfigProvider, ActionListener listener) { + DatafeedConfigReader datafeedConfigReader, ClusterState state, ActionListener listener) { AtomicReference jobHolder = new AtomicReference<>(); AtomicReference datafeedConfigHolder = new AtomicReference<>(); @@ -134,10 +134,10 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr // Get the job config and re-validate // Re-validation is required as the config has been re-read since // the previous validation - ActionListener jobConfigListener = ActionListener.wrap( - jobBuilder -> { + ActionListener jobConfigListener = ActionListener.wrap( + job -> { try { - jobHolder.set(jobBuilder.build()); + jobHolder.set(job); DatafeedJobValidator.validate(datafeedConfigHolder.get(), jobHolder.get()); jobIdConsumer.accept(jobHolder.get().getId()); } catch (Exception e) { @@ -148,11 +148,20 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr ); // Get the datafeed config - ActionListener datafeedConfigListener = ActionListener.wrap( - configBuilder -> { + ActionListener datafeedConfigListener = ActionListener.wrap( + datafeedConfig -> { try { - datafeedConfigHolder.set(configBuilder.build()); - jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener); + datafeedConfigHolder.set(datafeedConfig); + // Is the job in the cluster state? + Job job = MlMetadata.getMlMetadata(state).getJobs().get(datafeedConfig.getJobId()); + if (job != null) { + jobConfigListener.onResponse(job); + } else { + jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), ActionListener.wrap( + jobBuilder -> jobConfigListener.onResponse(jobBuilder.build()), + jobConfigListener::onFailure + )); + } } catch (Exception e) { listener.onFailure(e); } @@ -160,7 +169,7 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr listener::onFailure ); - datafeedConfigProvider.getDatafeedConfig(datafeedId, datafeedConfigListener); + datafeedConfigReader.datafeedConfig(datafeedId, state, datafeedConfigListener); } private static TimeValue getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index d49367de429f9..724c858584b80 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; @@ -12,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -48,7 +49,9 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -public class DatafeedManager extends AbstractComponent { +public class DatafeedManager { + + private static final Logger logger = LogManager.getLogger(DatafeedManager.class); private final Client client; private final ClusterService clusterService; @@ -95,7 +98,7 @@ public void onFailure(Exception e) { }, finishHandler::accept ); - datafeedJobBuilder.build(datafeedId, datafeedJobHandler); + datafeedJobBuilder.build(datafeedId, clusterService.state(), datafeedJobHandler); } public void stopDatafeed(TransportStartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index b1ee1776bfe25..a7f8b967944f0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -14,7 +14,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -29,23 +31,20 @@ public class DatafeedNodeSelector { private final String datafeedId; private final String jobId; private final List datafeedIndices; - private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, String jobId, List datafeedIndices) { - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeedId = datafeedId; this.jobId = jobId; this.datafeedIndices = datafeedIndices; - this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); } public void checkDatafeedTaskCanBeCreated() { - AssignmentFailure assignmentFailure = checkAssignment(); + AssignmentFailure assignmentFailure = checkAssignment(findJobTask()); if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { String msg = "No node found to start datafeed [" + datafeedId + "], " + "allocation explanation [" + assignmentFailure.reason + "]"; @@ -55,7 +54,8 @@ public void checkDatafeedTaskCanBeCreated() { } public PersistentTasksCustomMetaData.Assignment selectNode() { - AssignmentFailure assignmentFailure = checkAssignment(); + PersistentTasksCustomMetaData.PersistentTask jobTask = findJobTask(); + AssignmentFailure assignmentFailure = checkAssignment(jobTask); if (assignmentFailure == null) { return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), ""); } @@ -64,7 +64,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode() { } @Nullable - private AssignmentFailure checkAssignment() { + private AssignmentFailure checkAssignment(PersistentTasksCustomMetaData.PersistentTask jobTask) { PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); priorityFailureCollector.add(verifyIndicesActive()); @@ -126,6 +126,22 @@ private AssignmentFailure verifyIndicesActive() { return null; } + private PersistentTasksCustomMetaData.PersistentTask findJobTask() { + String foundJobId = jobId; + if (jobId == null) { + // This is because the datafeed persistent task was created before 6.6.0 + // and is missing the additional fields in the task parameters. + // In which case the datafeed config should still be in the clusterstate + DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(clusterState).getDatafeed(datafeedId); + if (datafeedConfig != null) { + foundJobId = datafeedConfig.getJobId(); + } + } + + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.getJobTask(foundJobId, tasks); + } + private static class AssignmentFailure { private final String reason; private final boolean isCriticalForTaskCreation; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 24abeb9d45b47..c7d21e3686642 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -44,13 +44,12 @@ import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -61,6 +60,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import java.io.IOException; import java.io.InputStream; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 727732540e72b..28d6d76eb8395 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -11,11 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateAction; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -24,10 +20,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -48,11 +44,9 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import java.time.Duration; -import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -100,8 +94,7 @@ public class AutoDetectResultProcessor { private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); - volatile CountDownLatch onCloseActionsLatch; - final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1); + final Semaphore jobUpdateSemaphore = new Semaphore(1); private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; @@ -172,11 +165,9 @@ public void process(AutodetectProcess process) { } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); } - if (processKilled == false) { - onAutodetectClose(); - } - LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); + runEstablishedModelMemoryUpdate(true); + } catch (Exception e) { failed = true; @@ -348,32 +339,34 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model } protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { + JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + try { // This blocks the main processing thread in the unlikely event // there are 2 model snapshots queued up. But it also has the // advantage of ensuring order - updateModelSnapshotIdSemaphore.acquire(); + jobUpdateSemaphore.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId); return; } - updateJob(jobId, Collections.singletonMap(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()), - new ActionListener() { - @Override - public void onResponse(UpdateResponse updateResponse) { - updateModelSnapshotIdSemaphore.release(); - LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); - } + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + jobUpdateSemaphore.release(); + LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); + } - @Override - public void onFailure(Exception e) { - updateModelSnapshotIdSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + - modelSnapshot.getSnapshotId() + "]", e); - } - }); + @Override + public void onFailure(Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + + modelSnapshot.getSnapshotId() + "]", e); + } + }); } /** @@ -420,7 +413,6 @@ synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { * to null by the first call. */ private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) { - if (scheduledEstablishedModelMemoryUpdate != null) { if (cancelExisting) { LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); @@ -431,26 +423,6 @@ private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting } } - private void onAutodetectClose() { - onCloseActionsLatch = new CountDownLatch(1); - - ActionListener updateListener = ActionListener.wrap( - updateResponse -> { - runEstablishedModelMemoryUpdate(true); - onCloseActionsLatch.countDown(); - }, - e -> { - LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e); - onCloseActionsLatch.countDown(); - } - ); - - updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()), - new ThreadedActionListener<>(LOGGER, client.threadPool(), - MachineLearning.UTILITY_THREAD_POOL_NAME, updateListener, false) - ); - } - private void updateEstablishedModelMemoryOnJob() { // Copy these before committing writes, so the calculation is done based on committed documents @@ -462,33 +434,41 @@ private void updateEstablishedModelMemoryOnJob() { jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - updateJob(jobId, Collections.singletonMap(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory), - new ActionListener() { - @Override - public void onResponse(UpdateResponse response) { - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } - @Override - public void onFailure(Exception e) { - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + - establishedModelMemory + "]", e); + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> { + try { + jobUpdateSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId); + return; } + + JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + jobUpdateSemaphore.release(); + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + + establishedModelMemory + "]", e); + } + }); }); } }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); } - private void updateJob(String jobId, Map update, ActionListener listener) { - UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), - ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); - updateRequest.retryOnConflict(3); - updateRequest.doc(update); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener); - } - public void awaitCompletion() throws TimeoutException { try { // Although the results won't take 30 minutes to finish, the pipe won't be closed @@ -498,17 +478,10 @@ public void awaitCompletion() throws TimeoutException { throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId); } - // Once completionLatch has passed then onCloseActionsLatch must either - // be set or null, it will not be set later. - if (onCloseActionsLatch != null && onCloseActionsLatch.await( - MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES) == false) { - throw new TimeoutException("Timed out waiting for results processor run post close actions " + jobId); - } - // Input stream has been completely processed at this point. // Wait for any updateModelSnapshotIdOnJob calls to complete. - updateModelSnapshotIdSemaphore.acquire(); - updateModelSnapshotIdSemaphore.release(); + jobUpdateSemaphore.acquire(); + jobUpdateSemaphore.release(); // These lines ensure that the "completion" we're awaiting includes making the results searchable waitUntilRenormalizerIsIdle(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index 572a10e44d20c..db2be13b75e0f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -108,7 +109,7 @@ public void testValidate_datafeedState() { exceptionHolder::set ); - closeJobAction.validate(Arrays.asList(jobId), false, startDataFeedTaskBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList(jobId), false, startDataFeedTaskBuilder.build(), listener); assertNull(responseHolder.get()); assertNotNull(exceptionHolder.get()); @@ -124,7 +125,7 @@ public void testValidate_datafeedState() { } exceptionHolder.set(null); - closeJobAction.validate(Arrays.asList(jobId), false, dataFeedNotStartedTaskBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList(jobId), false, dataFeedNotStartedTaskBuilder.build(), listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertThat(responseHolder.get().openJobIds, contains(jobId)); @@ -147,7 +148,7 @@ public void testValidate_givenFailedJob() { ); // force close so not an error for the failed job - closeJobAction.validate(Arrays.asList("job_id_failed"), true, tasksBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList("job_id_failed"), true, tasksBuilder.build(), listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertThat(responseHolder.get().openJobIds, contains("job_id_failed")); @@ -155,7 +156,7 @@ public void testValidate_givenFailedJob() { // not a force close so is an error responseHolder.set(null); - closeJobAction.validate(Arrays.asList("job_id_failed"), false, tasksBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList("job_id_failed"), false, tasksBuilder.build(), listener); assertNull(responseHolder.get()); assertNotNull(exceptionHolder.get()); assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); @@ -193,16 +194,16 @@ public void testValidate_withSpecificJobIds() { assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), responseHolder.get().openJobIds); assertEquals(Collections.emptyList(), responseHolder.get().closingJobIds); - closeJobAction.validate(Arrays.asList("job_id_closing"), false, tasks, listener); + closeJobAction.validate(Collections.singletonList("job_id_closing"), false, tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertEquals(Collections.emptyList(), responseHolder.get().openJobIds); - assertEquals(Arrays.asList("job_id_closing"), responseHolder.get().closingJobIds); + assertEquals(Collections.singletonList("job_id_closing"), responseHolder.get().closingJobIds); - closeJobAction.validate(Arrays.asList("job_id_open-1"), false, tasks, listener); + closeJobAction.validate(Collections.singletonList("job_id_open-1"), false, tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); - assertEquals(Arrays.asList("job_id_open-1"), responseHolder.get().openJobIds); + assertEquals(Collections.singletonList("job_id_open-1"), responseHolder.get().openJobIds); assertEquals(Collections.emptyList(), responseHolder.get().closingJobIds); } @@ -277,7 +278,8 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat private TransportCloseJobAction createAction() { return new TransportCloseJobAction(Settings.EMPTY, mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), - clusterService, mock(Auditor.class), mock(PersistentTasksService.class), datafeedConfigProvider, jobManager); + clusterService, mock(Auditor.class), mock(PersistentTasksService.class), datafeedConfigProvider, jobManager, + mock(Client.class)); } private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java index d2d69cd0d44ce..d575d913038ea 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java @@ -8,30 +8,64 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; import java.util.Date; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportFinalizeJobExecutionActionTests extends ESTestCase { + private ThreadPool threadPool; + private Client client; + + @Before + @SuppressWarnings("unchecked") + private void setupMocks() { + ExecutorService executorService = mock(ExecutorService.class); + threadPool = mock(ThreadPool.class); + org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService); + + client = mock(Client.class); + doAnswer( invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(null); + return null; + }).when(client).execute(eq(UpdateAction.INSTANCE), any(), any()); + + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + public void testOperation_noJobsInClusterState() { ClusterService clusterService = mock(ClusterService.class); TransportFinalizeJobExecutionAction action = createAction(clusterService); @@ -42,10 +76,11 @@ public void testOperation_noJobsInClusterState() { AtomicReference ack = new AtomicReference<>(); action.masterOperation(request, clusterState, ActionListener.wrap( ack::set, - e -> fail(e.getMessage()) + e -> assertNull(e.getMessage()) )); assertTrue(ack.get().isAcknowledged()); + verify(client, times(2)).execute(eq(UpdateAction.INSTANCE), any(), any()); verify(clusterService, never()).submitStateUpdateTask(any(), any()); } @@ -68,12 +103,13 @@ public void testOperation_jobInClusterState() { e -> fail(e.getMessage()) )); + verify(client, never()).execute(eq(UpdateAction.INSTANCE), any(), any()); verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); } private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) { return new TransportFinalizeJobExecutionAction(Settings.EMPTY, mock(TransportService.class), clusterService, - mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class)); + threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), client); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 393fc492f5d63..3ff80e7c9e882 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -68,6 +68,7 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -411,7 +412,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), nodeAttr, Collections.emptySet(), Version.V_6_2_0)) .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), - nodeAttr, Collections.emptySet(), Version.V_6_4_0)) + nodeAttr, Collections.emptySet(), Version.V_6_6_0)) .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -430,6 +431,34 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( assertNotNull(result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_indexJobsCannotBeAssignedToPre660Node() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.V_6_5_0)); + + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + + Job job = jobWithRules("v660-job"); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("v660-job", job, cs.build(), 2, 10, 30, memoryTracker, logger); + assertNull(result.getExecutorNode()); + assertEquals("Not opening job [v660-job] on node [_node_name1] version [6.5.0], " + + "because this node does not support jobs of version [6.6.0]", result.getExplanation()); + + nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.V_6_5_0)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.V_6_6_0)); + cs.nodes(nodes); + result = TransportOpenJobAction.selectLeastLoadedMlNode("v660-job", job, cs.build(), 2, 10, 30, memoryTracker, logger); + assertThat(result.getExplanation(), isEmptyOrNullString()); + assertEquals("_node_id2", result.getExecutorNode()); + } + public void testVerifyIndicesPrimaryShardsAreActive() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index d13e311531da2..3f98b51dc959a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -8,6 +8,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.mock.orig.Mockito; @@ -19,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; -import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -47,7 +48,7 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Consumer taskHandler; private JobResultsProvider jobResultsProvider; private JobConfigProvider jobConfigProvider; - private DatafeedConfigProvider datafeedConfigProvider; + private DatafeedConfigReader datafeedConfigReader; private DatafeedJobBuilder datafeedJobBuilder; @@ -79,7 +80,7 @@ public void init() { }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); jobConfigProvider = mock(JobConfigProvider.class); - datafeedConfigProvider = mock(DatafeedConfigProvider.class); + datafeedConfigReader = mock(DatafeedConfigReader.class); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -103,7 +104,10 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { givenJob(jobBuilder); givenDatafeed(datafeed); - datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedjobbuildertest-cluster")).build(); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigReader, + clusterState, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -125,13 +129,16 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter assertThat(datafeedJob.isIsolated(), is(false)); assertThat(datafeedJob.lastEndTimeMs(), equalTo(7_200_000L)); wasHandlerCalled.compareAndSet(false, true); - }, e -> fail() + }, e -> fail(e.getMessage()) ); givenJob(jobBuilder); givenDatafeed(datafeed); - datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedjobbuildertest-cluster")).build(); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigReader, + clusterState, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -153,13 +160,16 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec assertThat(datafeedJob.isIsolated(), is(false)); assertThat(datafeedJob.lastEndTimeMs(), equalTo(7_199_999L)); wasHandlerCalled.compareAndSet(false, true); - }, e -> fail() + }, e -> fail(e.getMessage()) ); givenJob(jobBuilder); givenDatafeed(datafeed); - datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedjobbuildertest-cluster")).build(); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigReader, + clusterState, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -184,7 +194,9 @@ public void testBuild_GivenBucketsRequestFails() { givenJob(jobBuilder); givenDatafeed(datafeed); - datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedjobbuildertest-cluster")).build(); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigReader, clusterState, ActionListener.wrap(datafeedJob -> fail(), taskHandler)); verify(taskHandler).accept(error); @@ -202,10 +214,10 @@ private void givenJob(Job.Builder job) { private void givenDatafeed(DatafeedConfig.Builder datafeed) { Mockito.doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") - ActionListener handler = (ActionListener) invocationOnMock.getArguments()[1]; - handler.onResponse(datafeed); + ActionListener handler = (ActionListener) invocationOnMock.getArguments()[2]; + handler.onResponse(datafeed.build()); return null; - }).when(datafeedConfigProvider).getDatafeedConfig(eq(datafeed.getId()), any()); + }).when(datafeedConfigReader).datafeedConfig(eq(datafeed.getId()), any(), any()); } private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimestamp) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 54aa3ade8e1b9..5d038a46c0df4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -123,10 +123,10 @@ public void setUpTests() { DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; listener.onResponse(datafeedJob); return null; - }).when(datafeedJobBuilder).build(any(), any()); + }).when(datafeedJobBuilder).build(any(), any(), any()); datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 18668cb8bef2d..ccd46ba860b17 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -100,7 +100,7 @@ protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { } @After - public void deleteJob() throws Exception { + public void deleteJob() { DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID); AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet(); assertTrue(response.isAcknowledged()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index e85ee6d565f35..06867d9c8d83d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -7,12 +7,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateAction; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -22,7 +18,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -33,7 +30,6 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -42,7 +38,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.After; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import java.time.Duration; @@ -51,20 +46,17 @@ import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -93,20 +85,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void setUpMocks() { executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); - doAnswer(invocation -> { - ActionListener listener = (ActionListener) invocation.getArguments()[2]; - listener.onResponse(new UpdateResponse()); - return null; - }).when(client).execute(same(UpdateAction.INSTANCE), any(), any()); threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - ExecutorService executorService = mock(ExecutorService.class); - org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService); auditor = mock(Auditor.class); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); @@ -136,9 +117,7 @@ public void testProcess() throws TimeoutException { processorUnderTest.process(process); processorUnderTest.awaitCompletion(); verify(renormalizer, times(1)).waitUntilIdle(); - verify(client, times(1)).execute(same(UpdateAction.INSTANCE), any(), any()); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(0, processorUnderTest.onCloseActionsLatch.getCount()); } public void testProcessResult_bucket() { @@ -413,9 +392,9 @@ public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Excep verify(persister, times(5)).persistModelSizeStats(any(ModelSizeStats.class)); // ...but only the last should trigger an established model memory update verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), any(Consumer.class), any(Consumer.class)); + verifyNoMoreInteractions(persister); verifyNoMoreInteractions(jobResultsProvider); assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); }); @@ -433,13 +412,11 @@ public void testProcessResult_modelSnapshot() { processorUnderTest.processResult(context, result); verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, + new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateRequest.class); - verify(client).execute(same(UpdateAction.INSTANCE), requestCaptor.capture(), any()); + verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); verifyNoMoreInteractions(persister); - - UpdateRequest capturedRequest = requestCaptor.getValue(); - assertThat(capturedRequest.doc().sourceAsMap().keySet(), contains(Job.MODEL_SNAPSHOT_ID.getPreferredName())); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { @@ -491,11 +468,9 @@ public void testAwaitCompletion() throws TimeoutException { AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); processorUnderTest.process(process); - processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(0, processorUnderTest.onCloseActionsLatch.getCount()); - assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); + assertEquals(1, processorUnderTest.jobUpdateSemaphore.availablePermits()); } public void testPersisterThrowingDoesntBlockProcessing() { @@ -549,9 +524,8 @@ public void testKill() throws TimeoutException { processorUnderTest.process(process); processorUnderTest.awaitCompletion(); - assertNull(processorUnderTest.onCloseActionsLatch); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); + assertEquals(1, processorUnderTest.jobUpdateSemaphore.availablePermits()); verify(persister, times(1)).commitResultWrites(JOB_ID); verify(persister, times(1)).commitStateWrites(JOB_ID); @@ -559,7 +533,6 @@ public void testKill() throws TimeoutException { verify(renormalizer).shutdown(); verify(renormalizer, times(1)).waitUntilIdle(); verify(flushListener, times(1)).clear(); - verify(client, never()).execute(same(UpdateAction.INSTANCE), any(), any()); } private void setupScheduleDelayTime(TimeValue delay) { diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 3b576ec537f5a..fb4b2a942e768 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -59,7 +59,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { @Before public void waitForMlTemplates() throws Exception { - XPackRestTestHelper.waitForMlTemplates(client()); + XPackRestTestHelper.waitForTemplates(client(), XPackRestTestHelper.ML_PRE_V660_TEMPLATES); } @Override diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 26071aaa6e2af..12559802cfc2f 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -163,6 +163,7 @@ subprojects { extraConfigFile 'x-pack/system_key', "${mainProject.projectDir}/src/test/resources/system_key" } setting 'xpack.watcher.encrypt_sensitive_data', 'true' + setting 'logger.org.elasticsearch.xpack.ml.action', 'DEBUG' } // Old versions of the code contain an invalid assertion that trips @@ -174,11 +175,6 @@ subprojects { if (version.before('5.6.9') || (version.onOrAfter('6.0.0') && version.before('6.2.4'))) { jvmArgs '-da:org.elasticsearch.xpack.monitoring.exporter.http.HttpExportBulk' } - - systemProperty 'tests.rest.blacklist', [ - 'old_cluster/30_ml_jobs_crud/*', - 'old_cluster/40_ml_datafeed_crud/*', - ].join(',') } Task oldClusterTestRunner = tasks.getByName("${baseName}#oldClusterTestRunner") @@ -222,11 +218,7 @@ subprojects { setting 'xpack.watcher.encrypt_sensitive_data', 'true' keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key" } - - systemProperty 'tests.rest.blacklist', [ - 'mixed_cluster/30_ml_jobs_crud/*', - 'mixed_cluster/40_ml_datafeed_crud/*', - ].join(',') + setting 'logger.org.elasticsearch.xpack.ml.action', 'DEBUG' } } @@ -244,8 +236,8 @@ subprojects { // We only need to run these tests once so we may as well do it when we're two thirds upgraded systemProperty 'tests.rest.blacklist', [ 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade', - 'mixed_cluster/30_ml_jobs_crud/*', - 'mixed_cluster/40_ml_datafeed_crud/*', + 'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data', + 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster', ].join(',') finalizedBy "${baseName}#oldClusterTestCluster#node1.stop" } @@ -262,11 +254,6 @@ subprojects { systemProperty 'tests.first_round', 'false' systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') finalizedBy "${baseName}#oldClusterTestCluster#node2.stop" - - systemProperty 'tests.rest.blacklist', [ - 'mixed_cluster/30_ml_jobs_crud/*', - 'mixed_cluster/40_ml_datafeed_crud/*', - ].join(',') } Task upgradedClusterTest = tasks.create(name: "${baseName}#upgradedClusterTest", type: RestIntegTestTask) @@ -300,11 +287,6 @@ subprojects { if (version.before('6.1.0') || version.onOrAfter('6.3.0')) { systemProperty 'tests.rest.blacklist', '/30_ml_jobs_crud/Test model memory limit is updated' } - - systemProperty 'tests.rest.blacklist', [ - 'upgraded_cluster/30_ml_jobs_crud/*', - 'upgraded_cluster/40_ml_datafeed_crud/*', - ].join(',') } Task versionBwcTest = tasks.create(name: "${baseName}#bwcTest") { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 5a9c866058dc2..3c7aca675d684 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.List; import java.util.Map; @TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs @@ -33,7 +34,23 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa */ @Before public void waitForTemplates() throws Exception { - XPackRestTestHelper.waitForMlTemplates(client()); + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (System.getProperty("tests.rest.suite").equals("old_cluster")) { + String versionProperty = System.getProperty("tests.upgrade_from_version"); + if (versionProperty == null) { + throw new IllegalStateException("System property 'tests.upgrade_from_version' not set, cannot start tests"); + } + + Version upgradeFromVersion = Version.fromString(versionProperty); + if (upgradeFromVersion.before(Version.V_6_6_0)) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + } + + XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor); } @AfterClass diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml new file mode 100644 index 0000000000000..6e06d2d4db827 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml @@ -0,0 +1,109 @@ +--- +"Test get old cluster jobs & datafeeds": + + - skip: + version: "- 6.1.0" + reason: "Wildcard expansion of jobs and datafeeds was added in 6.1.0" + + - do: + xpack.ml.get_jobs: + job_id: migration* + - match: { count: 2 } + - match: { jobs.0.job_id: migration-old-cluster-closed-job } + - match: { jobs.1.job_id: migration-old-cluster-open-job } + + - do: + xpack.ml.get_job_stats: + job_id: migration* + - match: { count: 2 } + - match: { jobs.0.job_id: migration-old-cluster-closed-job} + - match: { jobs.0.state: closed } + - is_false: jobs.0.node + - match: { jobs.1.job_id: migration-old-cluster-open-job} + - match: { jobs.1.state: opened } +# TODO can't test for assignment here as the job may not be re-allocated yet +# - is_true: jobs.1.node + - is_false: jobs.1.assignment_explanation + + - do: + xpack.ml.get_datafeeds: + datafeed_id: migration* + - match: { count: 2 } + - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed} + - length: { datafeeds.0.indices: 1 } + - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed} + - length: { datafeeds.1.indices: 1 } + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: migration* + - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed} + - match: { datafeeds.0.state: started } +# TODO can't test for assignment here as the datafeed may not be re-allocated yet +# - is_true: datafeeds.0.node + - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed} + - match: { datafeeds.1.state: stopped } + - is_false: datafeeds.1.node + +--- +"Test create open close delete job and datafeed": + + - do: + xpack.ml.put_job: + job_id: migration-ephemeral-job + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: migration-ephemeral-datafeed + body: > + { + "job_id":"migration-ephemeral-job", + "indices":["pet-data"] + } + + - do: + xpack.ml.open_job: + job_id: migration-ephemeral-job + + - do: + xpack.ml.get_job_stats: + job_id: migration-ephemeral-job + - match: { jobs.0.state: opened } + - is_true: jobs.0.node + + - do: + xpack.ml.start_datafeed: + datafeed_id: migration-ephemeral-datafeed + start: 0 + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: migration-ephemeral-datafeed + - match: { datafeeds.0.datafeed_id: migration-ephemeral-datafeed} + - match: { datafeeds.0.state: started} + - is_true: datafeeds.0.node + + - do: + xpack.ml.stop_datafeed: + datafeed_id: migration-ephemeral-datafeed + + - do: + xpack.ml.close_job: + job_id: migration-ephemeral-job + + - do: + xpack.ml.delete_datafeed: + datafeed_id: migration-ephemeral-datafeed + + - do: + xpack.ml.delete_job: + job_id: migration-ephemeral-job diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml index d587c1578ffef..f7f58df2333d3 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml @@ -56,60 +56,6 @@ index: [".ml-state", ".ml-anomalies-shared"] wait_for_status: green ---- -"Put job on the old cluster with the default model memory limit and post some data": - - do: - xpack.ml.put_job: - job_id: no-model-memory-limit-job - body: > - { - "analysis_config" : { - "bucket_span": "60s", - "detectors" :[{"function":"count"}] - }, - "data_description" : { - "time_field":"time", - "time_format":"epoch" - } - } - - match: { job_id: no-model-memory-limit-job } - - - do: - xpack.ml.open_job: - job_id: no-model-memory-limit-job - - - do: - xpack.ml.post_data: - job_id: no-model-memory-limit-job - body: - - sourcetype: post-data-job - time: 1403481600 - - sourcetype: post-data-job - time: 1403484700 - - sourcetype: post-data-job - time: 1403487700 - - sourcetype: post-data-job - time: 1403490700 - - sourcetype: post-data-job - time: 1403493700 - - match: { processed_record_count: 5 } - - - do: - xpack.ml.close_job: - job_id: no-model-memory-limit-job - - - do: - xpack.ml.get_buckets: - job_id: no-model-memory-limit-job - - match: { count: 201 } - -# Wait for indices to be fully allocated before -# killing the node - - do: - cluster.health: - index: [".ml-state", ".ml-anomalies-shared"] - wait_for_status: green - --- "Put job with empty strings in the configuration": - do: diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/60_ml_config_migration.yml new file mode 100644 index 0000000000000..8c93b8265dabf --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/60_ml_config_migration.yml @@ -0,0 +1,80 @@ +setup: + - do: + cluster.health: + wait_for_status: yellow + wait_for_nodes: 3 + timeout: 70s + + - do: + indices.create: + index: pet-data + body: + mappings: + doc: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + +--- +"Create a job and datafeed in the old cluster and open": + + - do: + xpack.ml.put_job: + job_id: migration-old-cluster-open-job + body: > + { + "description":"job migration", + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + } + - match: { job_id: migration-old-cluster-open-job } + + - do: + xpack.ml.open_job: + job_id: migration-old-cluster-open-job + + - do: + xpack.ml.put_datafeed: + datafeed_id: migration-old-cluster-started-datafeed + body: > + { + "job_id":"migration-old-cluster-open-job", + "indices":["pet-data"], + "types":["response"] + } + + - do: + xpack.ml.start_datafeed: + datafeed_id: migration-old-cluster-started-datafeed + start: 0 + + - do: + xpack.ml.put_job: + job_id: migration-old-cluster-closed-job + body: > + { + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"metric","field_name":"responsetime"}] + }, + "data_description" : { + } + } + - match: { job_id: migration-old-cluster-closed-job } + + - do: + xpack.ml.put_datafeed: + datafeed_id: migration-old-cluster-stopped-datafeed + body: > + { + "job_id":"migration-old-cluster-closed-job", + "indices":["pet-data"] + } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml index bf6d3bf6bdef0..5ffdefd77a974 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml @@ -73,27 +73,6 @@ setup: xpack.ml.get_jobs: job_id: mixed-cluster-job ---- -"Test job with no model memory limit has established model memory after reopening": - - do: - xpack.ml.open_job: - job_id: no-model-memory-limit-job - - - do: - xpack.ml.get_jobs: - job_id: no-model-memory-limit-job - - is_true: jobs.0.established_model_memory - - lt: { jobs.0.established_model_memory: 100000 } - - - do: - xpack.ml.close_job: - job_id: no-model-memory-limit-job - - - do: - xpack.ml.delete_job: - job_id: no-model-memory-limit-job - - match: { acknowledged: true } - --- "Test job with pre 6.4 rules": diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml new file mode 100644 index 0000000000000..c22815a0ae799 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml @@ -0,0 +1,94 @@ +setup: + - do: + cluster.health: + wait_for_status: green + wait_for_nodes: 3 + # wait for long enough that we give delayed unassigned shards to stop being delayed + timeout: 70s + +--- +"Test old cluster jobs and datafeeds and delete them": + + - do: + xpack.ml.get_jobs: + job_id: migration* + - match: { count: 2 } + - match: { jobs.0.job_id: migration-old-cluster-closed-job } + - match: { jobs.1.job_id: migration-old-cluster-open-job } + + - do: + xpack.ml.get_job_stats: + job_id: migration* + - match: { count: 2 } + - match: { jobs.0.job_id: migration-old-cluster-closed-job } + - match: { jobs.0.state: closed } + - is_false: jobs.0.node + - match: { jobs.1.job_id: migration-old-cluster-open-job } + - match: { jobs.1.state: opened } +# TODO can't test for assignment here as the job may not be re-allocated yet +# - is_true: jobs.1.node + - is_false: jobs.1.assignment_explanation + + - do: + xpack.ml.get_datafeeds: + datafeed_id: migration* + - match: { count: 2 } + - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed } + - length: { datafeeds.0.indices: 1 } + - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed } + - length: { datafeeds.1.indices: 1 } + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: migration* + - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed } + - match: { datafeeds.0.state: started } +# TODO can't test for assignment here as the datafeed may not be re-allocated yet +# - is_true: datafeeds.0.node + - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed } + - match: { datafeeds.1.state: stopped } + - is_false: datafeeds.1.node + + - do: + xpack.ml.close_job: + job_id: migration-old-cluster-open-job + + - do: + xpack.ml.get_jobs: + job_id: migration-old-cluster-open-job + - is_true: jobs.0.finished_time + + - do: + xpack.ml.stop_datafeed: + datafeed_id: migration-old-cluster-started-datafeed + + - do: + xpack.ml.delete_datafeed: + datafeed_id: migration-old-cluster-started-datafeed + + - do: + xpack.ml.delete_job: + job_id: migration-old-cluster-open-job + + - do: + catch: missing + xpack.ml.get_jobs: + job_id: migration-old-cluster-open-job + + - do: + xpack.ml.delete_datafeed: + datafeed_id: migration-old-cluster-stopped-datafeed + + - do: + xpack.ml.delete_job: + job_id: migration-old-cluster-closed-job + + - do: + xpack.ml.get_jobs: + job_id: migration* + - match: { count: 0 } + + - do: + xpack.ml.get_datafeeds: + datafeed_id: migration* + - match: { count: 0 }