Skip to content

Commit

Permalink
[ML] Adjust finalize job action to work with documents (#34226)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 18, 2018
1 parent afb238b commit 47787b3
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser

/** Indicates an update that was not triggered by a user */
private boolean isInternal;
private boolean waitForAck = true;

public Request(String jobId, JobUpdate update) {
this(jobId, update, false);
Expand Down Expand Up @@ -88,14 +87,6 @@ public boolean isInternal() {
return isInternal;
}

public boolean isWaitForAck() {
return waitForAck;
}

public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -111,10 +102,9 @@ public void readFrom(StreamInput in) throws IOException {
} else {
isInternal = false;
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
waitForAck = in.readBoolean();
} else {
waitForAck = true;
// TODO jindex change CURRENT to specific version when feature branch is merged
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) {
in.readBoolean(); // was waitForAck
}
}

Expand All @@ -126,8 +116,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
out.writeBoolean(isInternal);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeBoolean(waitForAck);
// TODO jindex change CURRENT to specific version when feature branch is merged
if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) {
out.writeBoolean(false); // was waitForAck
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION);
INTERNAL_PARSER.declareBoolean(Builder::setClearJobFinishTime, CLEAR_JOB_FINISH_TIME);
INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME);
}

private final String jobId;
Expand Down Expand Up @@ -710,7 +710,7 @@ public Builder setJobVersion(String version) {
return this;
}

public Builder setClearJobFinishTime(boolean clearJobFinishTime) {
public Builder setClearFinishTime(boolean clearJobFinishTime) {
this.clearJobFinishTime = clearJobFinishTime;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ protected UpdateJobAction.Request createTestInstance() {
// no need to randomize JobUpdate this is already tested in: JobUpdateTests
JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId);
jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L));
UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build());
request.setWaitForAck(randomBoolean());
UpdateJobAction.Request request;
if (randomBoolean()) {
request = new UpdateJobAction.Request(jobId, jobUpdate.build());
} else {
// this call sets isInternal = true
request = UpdateJobAction.Request.internal(jobId, jobUpdate.build());
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) {
update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0));
}
if (useInternalParser) {
update.setClearJobFinishTime(randomBoolean());
update.setClearFinishTime(randomBoolean());
}

return update.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -50,9 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

Expand Down Expand Up @@ -427,10 +423,7 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
listener.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,15 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {
Expand All @@ -51,41 +44,10 @@ protected AcknowledgedResponse newResponse() {

@Override
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
String jobIdString = String.join(",", request.getJobIds());
String source = "finalize_job_execution [" + jobIdString + "]";
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

for (String jobId : request.getJobIds()) {
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(finishedTime);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
}
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
.build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState,
ClusterState newState) {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
}
});
ActionListener<AcknowledgedResponse> listener) {
// This action is no longer required but needs to be preserved
// in case it is called by an old node in a mixed cluster
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public void onTimeout(TimeValue timeout) {
}

private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearJobFinishTime(true).build();
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public void onFailure(Exception e) {
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
Expand Down Expand Up @@ -373,7 +375,6 @@ private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> u
}
}


/**
* Check a job exists. A job exists if it has a configuration document.
* If the .ml-config index does not exist it is treated as a missing job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref
/**
* Persist a model snapshot description
*/
public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) {
public IndexResponse persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot));
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
}

/**
Expand Down
Loading

0 comments on commit 47787b3

Please sign in to comment.