-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clear Job#finished_time when it is opened (#32605) #32755
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,12 +18,14 @@ | |
import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.AliasOrIndex; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.routing.IndexRoutingTable; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
|
@@ -49,9 +51,11 @@ | |
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.XPackField; | ||
import org.elasticsearch.xpack.core.XPackPlugin; | ||
import org.elasticsearch.xpack.core.ml.MlMetaIndex; | ||
import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
import org.elasticsearch.xpack.core.ml.MlTasks; | ||
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; | ||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction; | ||
import org.elasticsearch.xpack.core.ml.action.PutJobAction; | ||
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; | ||
|
@@ -71,6 +75,7 @@ | |
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Date; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -471,12 +476,25 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste | |
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<OpenJobAction.Response> listener) { | ||
OpenJobAction.JobParams jobParams = request.getJobParams(); | ||
if (licenseState.isMachineLearningAllowed()) { | ||
// Step 5. Wait for job to be started and respond | ||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> finalListener = | ||
|
||
// Step 6. Clear job finished time once the job is started and respond | ||
ActionListener<OpenJobAction.Response> clearJobFinishTime = ActionListener.wrap( | ||
response -> { | ||
if(response.isAcknowledged()) { | ||
clearJobFinishedTime(jobParams.getJobId(), listener); | ||
} else { | ||
listener.onResponse(response); | ||
} | ||
}, | ||
listener::onFailure | ||
); | ||
|
||
// Step 5. Wait for job to be started | ||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>> waitForJobToStart = | ||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams>>() { | ||
@Override | ||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> task) { | ||
waitForJobStarted(task.getId(), jobParams, listener); | ||
waitForJobStarted(task.getId(), jobParams, clearJobFinishTime); | ||
} | ||
|
||
@Override | ||
|
@@ -492,7 +510,7 @@ public void onFailure(Exception e) { | |
// Step 4. Start job task | ||
ActionListener<PutJobAction.Response> establishedMemoryUpdateListener = ActionListener.wrap( | ||
response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), | ||
OpenJobAction.TASK_NAME, jobParams, finalListener), | ||
OpenJobAction.TASK_NAME, jobParams, waitForJobToStart), | ||
listener::onFailure | ||
); | ||
|
||
|
@@ -574,6 +592,35 @@ public void onTimeout(TimeValue timeout) { | |
}); | ||
} | ||
|
||
private void clearJobFinishedTime(String jobId, ActionListener<OpenJobAction.Response> listener) { | ||
clusterService.submitStateUpdateTask("clearing_job_finish_time [" + jobId + "]", new ClusterStateUpdateTask() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it seems that in most places we use hyphens to name the cluster state update tasks. It would be more consistent to rename this to |
||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is unnecessary as if a job is being opened we are certain there is ML Metadata already installed. |
||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); | ||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); | ||
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); | ||
jobBuilder.setFinishedTime(null); | ||
|
||
mlMetadataBuilder.putJob(jobBuilder.build(), true); | ||
ClusterState.Builder builder = ClusterState.builder(currentState); | ||
return builder.metaData(new MetaData.Builder(currentState.metaData()) | ||
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public void onFailure(String source, Exception e) { | ||
listener.onFailure(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am tempted to handle failure differently here. Even if the cluster state update failed, the job has successfully opened. It might be more misleading to return an error to the user, as they may think the job has not been opened. I wonder if logging the error should be enough. Any thoughts @droberts195 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I too considered just logging the issue and still letting the end user know that it succeeded. Definitely a contentious decision that needs discussion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should just log the error. As @dimitris-athanasiou said, if it fails here the job will have a status of |
||
} | ||
|
||
@Override | ||
public void clusterStateProcessed(String source, ClusterState oldState, | ||
ClusterState newState) { | ||
listener.onResponse(new OpenJobAction.Response(true)); | ||
} | ||
}); | ||
} | ||
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception, | ||
ActionListener<OpenJobAction.Response> listener) { | ||
persistentTasksService.sendRemoveRequest(persistentTask.getId(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package org.elasticsearch.xpack.ml.integration; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header |
||
|
||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; | ||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; | ||
import org.elasticsearch.xpack.core.ml.job.config.Detector; | ||
import org.elasticsearch.xpack.core.ml.job.config.Job; | ||
import org.junit.After; | ||
|
||
import java.util.Collections; | ||
|
||
import static org.hamcrest.CoreMatchers.notNullValue; | ||
import static org.hamcrest.CoreMatchers.nullValue; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
public class ReopenJobResetsFinishedTimeIT extends MlNativeAutodetectIntegTestCase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awesome! :-) |
||
|
||
@After | ||
public void cleanUpTest() { | ||
cleanUp(); | ||
} | ||
|
||
public void test() { | ||
final String jobId = "reset-finished-time-test"; | ||
Job.Builder job = createJob(jobId); | ||
|
||
registerJob(job); | ||
putJob(job); | ||
openJob(job.getId()); | ||
|
||
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); | ||
|
||
closeJob(jobId); | ||
assertThat(getSingleJob(jobId).getFinishedTime(), is(notNullValue())); | ||
|
||
openJob(jobId); | ||
assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); | ||
} | ||
|
||
private Job getSingleJob(String jobId) { | ||
return getJob(jobId).get(0); | ||
} | ||
|
||
private Job.Builder createJob(String id) { | ||
DataDescription.Builder dataDescription = new DataDescription.Builder(); | ||
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); | ||
dataDescription.setTimeFormat(DataDescription.EPOCH_MS); | ||
|
||
Detector.Builder d = new Detector.Builder("count", null); | ||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); | ||
|
||
Job.Builder builder = new Job.Builder(); | ||
builder.setId(id); | ||
builder.setAnalysisConfig(analysisConfig); | ||
builder.setDataDescription(dataDescription); | ||
return builder; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after
if