From 7cd92084507fd5dd552679029ec621b9424a0a06 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 5 Dec 2018 10:14:08 +0100 Subject: [PATCH] =?UTF-8?q?[ML-DataFrame]=20cancel=20indexer=20on=20job=20?= =?UTF-8?q?deletion=20and=20remove=20task,=20allow=20only=20stopped=20jo?= =?UTF-8?q?=E2=80=A6=20(#36204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cancel indexer on job deletion and remove task, allow only stopped jobs to be deleted --- .../action/DeleteDataFrameJobAction.java | 95 ++++++++++-- .../TransportDeleteDataFrameJobAction.java | 136 +++++++++--------- .../xpack/dataframe/job/DataFrameJobTask.java | 38 ++++- .../action/RestDeleteDataFrameJobAction.java | 12 +- 4 files changed, 197 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DeleteDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DeleteDataFrameJobAction.java index 09df36b90205e..48723b789a7fe 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DeleteDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DeleteDataFrameJobAction.java @@ -6,22 +6,29 @@ package org.elasticsearch.xpack.dataframe.action; import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.dataframe.job.DataFrameJob; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Objects; -public class DeleteDataFrameJobAction extends Action { +public class DeleteDataFrameJobAction extends Action { public static final DeleteDataFrameJobAction INSTANCE = new DeleteDataFrameJobAction(); public static final String NAME = "cluster:admin/data_frame/delete"; @@ -31,11 +38,11 @@ private DeleteDataFrameJobAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public Response newResponse() { + return new Response(); } - public static class Request extends AcknowledgedRequest implements ToXContent { + public static class Request extends BaseTasksRequest implements ToXContentFragment { private String id; public Request(String id) { @@ -54,6 +61,11 @@ public String getId() { return id; } + @Override + public boolean match(Task task) { + return task.getDescription().equals(DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -90,10 +102,73 @@ public boolean equals(Object obj) { } } - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + public static class RequestBuilder extends ActionRequestBuilder { protected RequestBuilder(ElasticsearchClient client, DeleteDataFrameJobAction action) { - super(client, action, new Request()); + super(client, action, new DeleteDataFrameJobAction.Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + private boolean acknowledged; + public Response(StreamInput in) throws IOException { + super(Collections.emptyList(), Collections.emptyList()); + readFrom(in); + } + + public Response(boolean acknowledged, List taskFailures, List nodeFailures) { + super(taskFailures, nodeFailures); + this.acknowledged = acknowledged; + } + + public Response(boolean acknowledged) { + this(acknowledged, Collections.emptyList(), Collections.emptyList()); + } + + public Response() { + this(false, Collections.emptyList(), Collections.emptyList()); + } + + public boolean isDeleted() { + return acknowledged; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + acknowledged = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + toXContentCommon(builder, params); + builder.field("acknowledged", acknowledged); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DeleteDataFrameJobAction.Response response = (DeleteDataFrameJobAction.Response) o; + return super.equals(o) && acknowledged == response.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), acknowledged); } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameJobAction.java index c6a2fe8094b20..bc1385f56b99d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameJobAction.java @@ -5,103 +5,95 @@ */ package org.elasticsearch.xpack.dataframe.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.dataframe.job.DataFrameJob; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Request; +import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Response; +import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.List; -public class TransportDeleteDataFrameJobAction - extends TransportMasterNodeAction { - - private final PersistentTasksService persistentTasksService; - private static final Logger logger = LogManager.getLogger(TransportDeleteDataFrameJobAction.class); +public class TransportDeleteDataFrameJobAction extends TransportTasksAction { @Inject - public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - PersistentTasksService persistentTasksService, ClusterService clusterService) { - super(DeleteDataFrameJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, DeleteDataFrameJobAction.Request::new); - this.persistentTasksService = persistentTasksService; + public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService, + ClusterService clusterService) { + super(DeleteDataFrameJobAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, + ThreadPool.Names.SAME); } @Override - protected String executor() { - return ThreadPool.Names.SAME; + protected Response newResponse(Request request, List tasks, List taskOperationFailures, + List failedNodeExceptions) { + assert tasks.size() + taskOperationFailures.size() == 1; + boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted); + + return new Response(cancelled, taskOperationFailures, failedNodeExceptions); } @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + protected Response readTaskResponse(StreamInput in) throws IOException { + Response response = new Response(); + response.readFrom(in); + return response; } @Override - protected void masterOperation(DeleteDataFrameJobAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - - String jobId = request.getId(); - TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option - - // Step 1. Cancel the persistent task - persistentTasksService.sendRemoveRequest(jobId, new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - logger.debug("Request to cancel Task for data frame job [" + jobId + "] successful."); - - // Step 2. Wait for the task to finish cancellation internally - persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout, - new PersistentTasksService.WaitForPersistentTaskListener() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - logger.debug("Task for data frame job [" + jobId + "] successfully canceled."); - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(Exception e) { - logger.error("Error while cancelling task for data frame job [" + jobId - + "]." + e); - listener.onFailure(e); - } - - @Override - public void onTimeout(TimeValue timeout) { - String msg = "Stopping of data frame job [" + jobId + "] timed out after [" + timeout + "]."; - logger.warn(msg); - listener.onFailure(new ElasticsearchException(msg)); - } - }); - } - - @Override - public void onFailure(Exception e) { - logger.error("Error while requesting to cancel task for data frame job [" + jobId + "]" + e); - listener.onFailure(e); - } - }); - + protected void taskOperation(Request request, DataFrameJobTask task, ActionListener listener) { + assert task.getConfig().getId().equals(request.getId()); + IndexerState state = task.getState().getJobState(); + if (state.equals(IndexerState.STOPPED)) { + task.onCancelled(); + listener.onResponse(new Response(true)); + } else { + listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + "indexer state is [" + + state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion.")); + } } @Override - protected ClusterBlockException checkBlock(DeleteDataFrameJobAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster()) { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call, + // no need to go looking for the allocated task + listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist")); + } + } else { + // Delegates DeleteJob to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, Response::new)); + } + } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java index 8305ec550fb03..966896cf4ea28 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; +import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction; import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction; import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction.Response; @@ -35,16 +36,18 @@ public class DataFrameJobTask extends AllocatedPersistentTask implements Schedul private static final Logger logger = LogManager.getLogger(DataFrameJobTask.class); private final DataFrameJob job; + private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameIndexer indexer; - static final String SCHEDULE_NAME = "xpack/data_frame/job" + "/schedule"; + static final String SCHEDULE_NAME = DataFrame.TASK_NAME + "/schedule"; public DataFrameJobTask(long id, String type, String action, TaskId parentTask, DataFrameJob job, DataFrameJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { super(id, type, action, DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + job.getConfig().getId(), parentTask, headers); this.job = job; + this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; logger.info("construct job task"); // todo: simplistic implementation for now @@ -86,6 +89,38 @@ public void triggered(Event event) { } } + /** + * Attempt to gracefully cleanup the data frame job so it can be terminated. + * This tries to remove the job from the scheduler, and potentially any other + * cleanup operations in the future + */ + synchronized void shutdown() { + try { + logger.info("Data frame indexer [" + job.getConfig().getId() + "] received abort request, stopping indexer."); + schedulerEngine.remove(SCHEDULE_NAME + "_" + job.getConfig().getId()); + schedulerEngine.unregister(this); + } catch (Exception e) { + markAsFailed(e); + return; + } + markAsCompleted(); + } + + /** + * This is called when the persistent task signals that the allocated task should be terminated. + * Termination in the task framework is essentially voluntary, as the allocated task can only be + * shut down from the inside. + */ + @Override + public synchronized void onCancelled() { + logger.info( + "Received cancellation request for data frame job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]"); + if (indexer.abort()) { + // there is no background job running, we can shutdown safely + shutdown(); + } + } + protected class ClientDataFrameIndexer extends DataFrameIndexer { private final Client client; @@ -138,6 +173,7 @@ protected void onFinish() { @Override protected void onAbort() { logger.info("Data frame job [" + job.getConfig().getId() + "] received abort request, stopping indexer"); + shutdown(); } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameJobAction.java index b6c441735d0ad..8923a27ccbd56 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameJobAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction; @@ -30,7 +31,16 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient String id = restRequest.param(DataFrameJob.ID.getPreferredName()); DeleteDataFrameJobAction.Request request = new DeleteDataFrameJobAction.Request(id); - return channel -> client.execute(DeleteDataFrameJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeleteDataFrameJobAction.INSTANCE, request, + new RestToXContentListener(channel) { + @Override + protected RestStatus getStatus(DeleteDataFrameJobAction.Response response) { + if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) { + return RestStatus.INTERNAL_SERVER_ERROR; + } + return RestStatus.OK; + } + }); } @Override