Skip to content

Commit

Permalink
[ML-DataFrame] cancel indexer on job deletion and remove task, allow …
Browse files Browse the repository at this point in the history
…only stopped jo… (#36204)

cancel indexer on job deletion and remove task, allow only stopped jobs to be deleted
  • Loading branch information
Hendrik Muhs authored Dec 5, 2018
1 parent dd08f7b commit 7cd9208
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteDataFrameJobAction extends Action<AcknowledgedResponse> {
public class DeleteDataFrameJobAction extends Action<DeleteDataFrameJobAction.Response> {

public static final DeleteDataFrameJobAction INSTANCE = new DeleteDataFrameJobAction();
public static final String NAME = "cluster:admin/data_frame/delete";
Expand All @@ -31,11 +38,11 @@ private DeleteDataFrameJobAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
private String id;

public Request(String id) {
Expand All @@ -54,6 +61,11 @@ public String getId() {
return id;
}

@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -90,10 +102,73 @@ public boolean equals(Object obj) {
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
public static class RequestBuilder extends ActionRequestBuilder<DeleteDataFrameJobAction.Request, DeleteDataFrameJobAction.Response> {

protected RequestBuilder(ElasticsearchClient client, DeleteDataFrameJobAction action) {
super(client, action, new Request());
super(client, action, new DeleteDataFrameJobAction.Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}

public Response() {
this(false, Collections.emptyList(), Collections.emptyList());
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameJobAction.Response response = (DeleteDataFrameJobAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,103 +5,95 @@
*/
package org.elasticsearch.xpack.dataframe.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Request;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Response;
import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.List;

public class TransportDeleteDataFrameJobAction
extends TransportMasterNodeAction<DeleteDataFrameJobAction.Request, AcknowledgedResponse> {

private final PersistentTasksService persistentTasksService;
private static final Logger logger = LogManager.getLogger(TransportDeleteDataFrameJobAction.class);
public class TransportDeleteDataFrameJobAction extends TransportTasksAction<DataFrameJobTask, Request, Response, Response> {

@Inject
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
PersistentTasksService persistentTasksService, ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteDataFrameJobAction.Request::new);
this.persistentTasksService = persistentTasksService;
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
ThreadPool.Names.SAME);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
assert tasks.size() + taskOperationFailures.size() == 1;
boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted);

return new Response(cancelled, taskOperationFailures, failedNodeExceptions);
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
protected Response readTaskResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
return response;
}

@Override
protected void masterOperation(DeleteDataFrameJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {

String jobId = request.getId();
TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option

// Step 1. Cancel the persistent task
persistentTasksService.sendRemoveRequest(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.debug("Request to cancel Task for data frame job [" + jobId + "] successful.");

// Step 2. Wait for the task to finish cancellation internally
persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
new PersistentTasksService.WaitForPersistentTaskListener<DataFrameJob>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<DataFrameJob> task) {
logger.debug("Task for data frame job [" + jobId + "] successfully canceled.");
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(Exception e) {
logger.error("Error while cancelling task for data frame job [" + jobId
+ "]." + e);
listener.onFailure(e);
}

@Override
public void onTimeout(TimeValue timeout) {
String msg = "Stopping of data frame job [" + jobId + "] timed out after [" + timeout + "].";
logger.warn(msg);
listener.onFailure(new ElasticsearchException(msg));
}
});
}

@Override
public void onFailure(Exception e) {
logger.error("Error while requesting to cancel task for data frame job [" + jobId + "]" + e);
listener.onFailure(e);
}
});

protected void taskOperation(Request request, DataFrameJobTask task, ActionListener<Response> listener) {
assert task.getConfig().getId().equals(request.getId());
IndexerState state = task.getState().getJobState();
if (state.equals(IndexerState.STOPPED)) {
task.onCancelled();
listener.onResponse(new Response(true));
} else {
listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + "indexer state is ["
+ state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion."));
}
}

@Override
protected ClusterBlockException checkBlock(DeleteDataFrameJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {
// If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call,
// no need to go looking for the allocated task
listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist"));
}
} else {
// Delegates DeleteJob to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows jobs which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction.Response;
Expand All @@ -35,16 +36,18 @@ public class DataFrameJobTask extends AllocatedPersistentTask implements Schedul
private static final Logger logger = LogManager.getLogger(DataFrameJobTask.class);

private final DataFrameJob job;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;

static final String SCHEDULE_NAME = "xpack/data_frame/job" + "/schedule";
static final String SCHEDULE_NAME = DataFrame.TASK_NAME + "/schedule";

public DataFrameJobTask(long id, String type, String action, TaskId parentTask, DataFrameJob job,
DataFrameJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool,
Map<String, String> headers) {
super(id, type, action, DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + job.getConfig().getId(), parentTask, headers);
this.job = job;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
logger.info("construct job task");
// todo: simplistic implementation for now
Expand Down Expand Up @@ -86,6 +89,38 @@ public void triggered(Event event) {
}
}

/**
* Attempt to gracefully cleanup the data frame job so it can be terminated.
* This tries to remove the job from the scheduler, and potentially any other
* cleanup operations in the future
*/
synchronized void shutdown() {
try {
logger.info("Data frame indexer [" + job.getConfig().getId() + "] received abort request, stopping indexer.");
schedulerEngine.remove(SCHEDULE_NAME + "_" + job.getConfig().getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
markAsFailed(e);
return;
}
markAsCompleted();
}

/**
* This is called when the persistent task signals that the allocated task should be terminated.
* Termination in the task framework is essentially voluntary, as the allocated task can only be
* shut down from the inside.
*/
@Override
public synchronized void onCancelled() {
logger.info(
"Received cancellation request for data frame job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]");
if (indexer.abort()) {
// there is no background job running, we can shutdown safely
shutdown();
}
}

protected class ClientDataFrameIndexer extends DataFrameIndexer {
private final Client client;

Expand Down Expand Up @@ -138,6 +173,7 @@ protected void onFinish() {
@Override
protected void onAbort() {
logger.info("Data frame job [" + job.getConfig().getId() + "] received abort request, stopping indexer");
shutdown();
}
}
}
Loading

0 comments on commit 7cd9208

Please sign in to comment.