From 08218afe7416343a2f82a29d84ff918c7f522944 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 9 Aug 2018 11:26:00 -0500 Subject: [PATCH 1/3] Clear Job#finished_time when it is opened (#32605) --- .../ml/action/TransportOpenJobAction.java | 55 ++++++++++++++++-- .../ReopenJobResetsFinishedTimeIT.java | 57 +++++++++++++++++++ 2 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 019715b30853e..7c3a56bcc622d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -18,12 +18,14 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; @@ -49,9 +51,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -71,6 +75,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -471,12 +476,25 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener listener) { OpenJobAction.JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { - // Step 5. Wait for job to be started and respond - ActionListener> finalListener = + + // Step 6. Clear job finished time once the job is started and respond + ActionListener clearJobFinishTime = ActionListener.wrap( + response -> { + if(response.isAcknowledged()) { + clearJobFinishedTime(jobParams.getJobId(), listener); + } else { + listener.onResponse(response); + } + }, + listener::onFailure + ); + + // Step 5. Wait for job to be started + ActionListener> waitForJobToStart = new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - waitForJobStarted(task.getId(), jobParams, listener); + waitForJobStarted(task.getId(), jobParams, clearJobFinishTime); } @Override @@ -492,7 +510,7 @@ public void onFailure(Exception e) { // Step 4. Start job task ActionListener establishedMemoryUpdateListener = ActionListener.wrap( response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - OpenJobAction.TASK_NAME, jobParams, finalListener), + OpenJobAction.TASK_NAME, jobParams, waitForJobToStart), listener::onFailure ); @@ -574,6 +592,35 @@ public void onTimeout(TimeValue timeout) { }); } + private void clearJobFinishedTime(String jobId, ActionListener listener) { + clusterService.submitStateUpdateTask("clearing_job_finish_time [" + jobId + "]", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(null); + + mlMetadataBuilder.putJob(jobBuilder.build(), true); + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new OpenJobAction.Response(true)); + } + }); + } private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), diff --git a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java new file mode 100644 index 0000000000000..85b73873dba2a --- /dev/null +++ b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java @@ -0,0 +1,57 @@ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.junit.After; + +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.is; + +public class ReopenJobResetsFinishedTimeIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void test() { + final String jobId = "reset-finished-time-test"; + Job.Builder job = createJob(jobId); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); + + closeJob(jobId); + assertThat(getSingleJob(jobId).getFinishedTime(), is(notNullValue())); + + openJob(jobId); + assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); + } + + private Job getSingleJob(String jobId) { + return getJob(jobId).get(0); + } + + private Job.Builder createJob(String id) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } +} From 75913a90adea9900a5a162148db2fcc8669a01ab Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 10 Aug 2018 09:43:51 -0500 Subject: [PATCH 2/3] not returning failure when Job#finished_time is not reset --- .../xpack/ml/action/TransportOpenJobAction.java | 11 ++++------- .../ml/integration/ReopenJobResetsFinishedTimeIT.java | 5 +++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 7c3a56bcc622d..28743ce4e1c6e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -51,11 +51,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -75,7 +73,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -480,7 +477,7 @@ protected void masterOperation(OpenJobAction.Request request, ClusterState state // Step 6. Clear job finished time once the job is started and respond ActionListener clearJobFinishTime = ActionListener.wrap( response -> { - if(response.isAcknowledged()) { + if (response.isAcknowledged()) { clearJobFinishedTime(jobParams.getJobId(), listener); } else { listener.onResponse(response); @@ -593,10 +590,9 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - clusterService.submitStateUpdateTask("clearing_job_finish_time [" + jobId + "]", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("clearing-job-finish-time [" + jobId + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - XPackPlugin.checkReadyForXPackCustomMetadata(currentState); MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); @@ -611,7 +607,8 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { - listener.onFailure(e); + logger.error(source + "Failed to clear finished_time due to [" + e.getMessage() + "]", e); + listener.onResponse(new OpenJobAction.Response(true)); } @Override diff --git a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java index 85b73873dba2a..325b1370315ca 100644 --- a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java +++ b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java @@ -1,3 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; From d1bbd559bc2ada5be22454d00b0d280b87e06e6e Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 10 Aug 2018 10:16:25 -0500 Subject: [PATCH 3/3] Changing error log string and source string --- .../elasticsearch/xpack/ml/action/TransportOpenJobAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 28743ce4e1c6e..c4e5793b45171 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -590,7 +590,7 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - clusterService.submitStateUpdateTask("clearing-job-finish-time [" + jobId + "]", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); @@ -607,7 +607,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { - logger.error(source + "Failed to clear finished_time due to [" + e.getMessage() + "]", e); + logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); listener.onResponse(new OpenJobAction.Response(true)); }