Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] cancel indexer on job deletion and remove task, allow only stopped jo… #36204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteDataFrameJobAction extends Action<AcknowledgedResponse> {
public class DeleteDataFrameJobAction extends Action<DeleteDataFrameJobAction.Response> {

public static final DeleteDataFrameJobAction INSTANCE = new DeleteDataFrameJobAction();
public static final String NAME = "cluster:admin/data_frame/delete";
Expand All @@ -31,11 +38,11 @@ private DeleteDataFrameJobAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
private String id;

public Request(String id) {
Expand All @@ -54,6 +61,11 @@ public String getId() {
return id;
}

@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -90,10 +102,73 @@ public boolean equals(Object obj) {
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
public static class RequestBuilder extends ActionRequestBuilder<DeleteDataFrameJobAction.Request, DeleteDataFrameJobAction.Response> {

protected RequestBuilder(ElasticsearchClient client, DeleteDataFrameJobAction action) {
super(client, action, new Request());
super(client, action, new DeleteDataFrameJobAction.Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}

public Response() {
this(false, Collections.emptyList(), Collections.emptyList());
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameJobAction.Response response = (DeleteDataFrameJobAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,103 +5,95 @@
*/
package org.elasticsearch.xpack.dataframe.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Request;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Response;
import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.List;

public class TransportDeleteDataFrameJobAction
extends TransportMasterNodeAction<DeleteDataFrameJobAction.Request, AcknowledgedResponse> {

private final PersistentTasksService persistentTasksService;
private static final Logger logger = LogManager.getLogger(TransportDeleteDataFrameJobAction.class);
public class TransportDeleteDataFrameJobAction extends TransportTasksAction<DataFrameJobTask, Request, Response, Response> {

@Inject
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
PersistentTasksService persistentTasksService, ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteDataFrameJobAction.Request::new);
this.persistentTasksService = persistentTasksService;
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
ThreadPool.Names.SAME);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
assert tasks.size() + taskOperationFailures.size() == 1;
boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted);

return new Response(cancelled, taskOperationFailures, failedNodeExceptions);
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
protected Response readTaskResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
return response;
}

@Override
protected void masterOperation(DeleteDataFrameJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {

String jobId = request.getId();
TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option

// Step 1. Cancel the persistent task
persistentTasksService.sendRemoveRequest(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.debug("Request to cancel Task for data frame job [" + jobId + "] successful.");

// Step 2. Wait for the task to finish cancellation internally
persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
new PersistentTasksService.WaitForPersistentTaskListener<DataFrameJob>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<DataFrameJob> task) {
logger.debug("Task for data frame job [" + jobId + "] successfully canceled.");
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(Exception e) {
logger.error("Error while cancelling task for data frame job [" + jobId
+ "]." + e);
listener.onFailure(e);
}

@Override
public void onTimeout(TimeValue timeout) {
String msg = "Stopping of data frame job [" + jobId + "] timed out after [" + timeout + "].";
logger.warn(msg);
listener.onFailure(new ElasticsearchException(msg));
}
});
}

@Override
public void onFailure(Exception e) {
logger.error("Error while requesting to cancel task for data frame job [" + jobId + "]" + e);
listener.onFailure(e);
}
});

protected void taskOperation(Request request, DataFrameJobTask task, ActionListener<Response> listener) {
assert task.getConfig().getId().equals(request.getId());
IndexerState state = task.getState().getJobState();
if (state.equals(IndexerState.STOPPED)) {
task.onCancelled();
listener.onResponse(new Response(true));
} else {
listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + "indexer state is ["
+ state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion."));
}
}

@Override
protected ClusterBlockException checkBlock(DeleteDataFrameJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {
// If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call,
// no need to go looking for the allocated task
listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist"));
}
} else {
// Delegates DeleteJob to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows jobs which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction.Response;
Expand All @@ -35,16 +36,18 @@ public class DataFrameJobTask extends AllocatedPersistentTask implements Schedul
private static final Logger logger = LogManager.getLogger(DataFrameJobTask.class);

private final DataFrameJob job;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;

static final String SCHEDULE_NAME = "xpack/data_frame/job" + "/schedule";
static final String SCHEDULE_NAME = DataFrame.TASK_NAME + "/schedule";

public DataFrameJobTask(long id, String type, String action, TaskId parentTask, DataFrameJob job,
DataFrameJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool,
Map<String, String> headers) {
super(id, type, action, DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + job.getConfig().getId(), parentTask, headers);
this.job = job;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
logger.info("construct job task");
// todo: simplistic implementation for now
Expand Down Expand Up @@ -86,6 +89,38 @@ public void triggered(Event event) {
}
}

/**
* Attempt to gracefully cleanup the data frame job so it can be terminated.
* This tries to remove the job from the scheduler, and potentially any other
* cleanup operations in the future
*/
synchronized void shutdown() {
try {
logger.info("Data frame indexer [" + job.getConfig().getId() + "] received abort request, stopping indexer.");
schedulerEngine.remove(SCHEDULE_NAME + "_" + job.getConfig().getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
markAsFailed(e);
return;
}
markAsCompleted();
}

/**
* This is called when the persistent task signals that the allocated task should be terminated.
* Termination in the task framework is essentially voluntary, as the allocated task can only be
* shut down from the inside.
*/
@Override
public synchronized void onCancelled() {
logger.info(
"Received cancellation request for data frame job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]");
if (indexer.abort()) {
// there is no background job running, we can shutdown safely
shutdown();
}
}

protected class ClientDataFrameIndexer extends DataFrameIndexer {
private final Client client;

Expand Down Expand Up @@ -138,6 +173,7 @@ protected void onFinish() {
@Override
protected void onAbort() {
logger.info("Data frame job [" + job.getConfig().getId() + "] received abort request, stopping indexer");
shutdown();
}
}
}
Loading