Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data frame feature #38934

Merged
merged 98 commits into from
Feb 18, 2019
Merged
Changes from 1 commit
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
5d89d3c
feature index prototype
Jul 24, 2018
9c59ae1
1st version of a feature index builder
Aug 2, 2018
73ea5aa
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Aug 3, 2018
d78d131
do not reuse ML feature causing a conflict
Aug 3, 2018
fd708ed
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Aug 10, 2018
1a00847
temporary merge PR 32743 (#32776)
Aug 10, 2018
e6d58c5
[ML-Dataframe] Feature/dataframe basictests (#32783)
Aug 14, 2018
a0b72c9
Revert "temporary merge PR 32743 (#32776)"
Sep 5, 2018
192e6de
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Sep 5, 2018
b2195fb
adapt to upstream changes
Sep 5, 2018
4946d33
[ML-Dataframe] Use AsyncTwoPhaseIndexer (#33504)
Sep 10, 2018
b13cca3
[ML-Dataframe] add stop and delete endpoints (#33597)
Sep 12, 2018
9ff27a7
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Sep 12, 2018
ac211fa
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Sep 12, 2018
f3de9fb
adapt to upstream changes
Sep 12, 2018
10e01ae
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Sep 21, 2018
e5c2bf1
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Sep 25, 2018
28e426a
[ML-Dataframe] add basic configuration (#33813)
Sep 25, 2018
8eb7976
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Oct 9, 2018
b1802f1
adapt to upstream changes
Oct 10, 2018
f477852
[ML-Dataframe] Feature/fib reenable agg config tests (#34453)
Oct 16, 2018
89a19d8
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Oct 17, 2018
97782f5
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Oct 18, 2018
ad8cc92
[ML-Dataframe] Feature/fib multi aggs and sources (#34525)
Oct 19, 2018
0310769
fix NPE and creating XContent from search phase
Oct 25, 2018
6a2f761
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Oct 26, 2018
7d34ec1
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 13, 2018
6bf69d0
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 13, 2018
2ff80fd
adapt to upstream changes
Nov 13, 2018
341091b
fix imports
Nov 13, 2018
7036ffa
[ML-Dataframe] add validation and mapping detection (#34844)
Nov 15, 2018
016dea1
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 15, 2018
6b66fe9
[ML-Dataframe] re-factor code to use LogManager and own loggers (#35631)
Nov 16, 2018
22cdb96
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 19, 2018
47b12b7
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 20, 2018
2b005b0
[ML-Dataframe] change licensing check and plugin name/description (#3…
Nov 20, 2018
70ca2da
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 22, 2018
2da7e30
remove settings parameter after upstream re-factoring (#35831)
Nov 22, 2018
43536a0
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 24, 2018
926abc2
[ML-Dataframe] Feature/fib get jobs (#35825)
Nov 25, 2018
e4a1efe
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 26, 2018
f5362b0
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 27, 2018
130a4e5
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Nov 28, 2018
7a78e1f
[ML-DataFrame] add a stats endpoint (#35911)
Nov 28, 2018
c1bcd36
dedup ID fields (#35995)
Nov 28, 2018
12318a5
[ML-DataFrame] Rename plugin and endpoints (#36048)
Nov 29, 2018
116bfef
add a client helper origin for data frame (#36057)
Nov 30, 2018
9f6b85d
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 4, 2018
dd08f7b
adapt to upstream changes
Dec 4, 2018
7cd9208
[ML-DataFrame] cancel indexer on job deletion and remove task, allow …
Dec 5, 2018
9d5ab7d
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 7, 2018
3f7cb62
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 10, 2018
5f03768
repair merge conflicts
Dec 10, 2018
cf92a1a
[ML-DataFrame] fix state persistence and load on startup (#36375)
Dec 10, 2018
fa85f40
[ML-DataFrame] add generation to dataframe state (#36434)
Dec 11, 2018
4f90174
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 12, 2018
2f98c1d
[ML-Dataframe] add integration tests (#34554)
Dec 12, 2018
239eb27
move common fields and string into a utility class (#36557)
Dec 13, 2018
7bbbdee
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 18, 2018
972c063
[ML-DataFrame] add a wait_for_completion option to the stop data fra…
Dec 19, 2018
7010273
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Dec 19, 2018
b65a799
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 2, 2019
a072c0d
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 8, 2019
c7e6480
fix merge conflict
Jan 8, 2019
cc3a9e1
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 9, 2019
4aec7f7
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 10, 2019
ce82256
[ML-DataFrame] Feature/fib jobconfigmanager (#37068)
Jan 10, 2019
89209c1
return job counts and statistics for usage endpoint (#37346)
Jan 14, 2019
d48cbde
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 15, 2019
48ebf09
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 15, 2019
8d487bc
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 16, 2019
c82f7f3
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 17, 2019
13ef708
[ML-DataFrame] rename jobs to transforms (#37518)
Jan 17, 2019
4582609
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 21, 2019
ec6684c
fix merge conflict
Jan 21, 2019
94eba06
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 23, 2019
fac99c7
[ML-DataFrame] implement new data frame transforms configuration (#37…
Jan 23, 2019
c1e0851
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 24, 2019
84c1ba2
[ML-DataFrame] add query support (#37827)
Jan 28, 2019
a16d21c
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 29, 2019
719d3fb
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Jan 31, 2019
1c04676
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 1, 2019
d23ac76
[ML-DataFrame] Feature/fib lenient aggs (#38122)
Feb 1, 2019
cdad0d2
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 5, 2019
094bfa8
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 6, 2019
3de0c53
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 6, 2019
4bb0fc6
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 8, 2019
417a7db
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 8, 2019
cd7292c
add support for max, value_count, cardinality and sum (#38569)
Feb 9, 2019
cedd78c
[ML-DataFrame] Add support for (date) histogram pivots (#38725)
benwtrent Feb 11, 2019
124b170
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 11, 2019
dd1943c
[ML-DataFrame] allow aggs as abbreviation for aggregations (#38706)
Feb 12, 2019
3122884
[ML-DataFrame] set meta data on data frame index (#38766)
Feb 14, 2019
2dabf4a
default to match_all if query is not given (#38865)
Feb 14, 2019
5758a11
[ML-DataFrame] remove array arguments for group_by (#38895)
Feb 15, 2019
8a0c3fd
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 15, 2019
614cb0f
remove superflous import
Feb 15, 2019
54e6d10
Merge branch 'master' of github.com:elastic/elasticsearch into featur…
Feb 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[ML-DataFrame] cancel indexer on job deletion and remove task, allow …
…only stopped jo… (#36204)

cancel indexer on job deletion and remove task, allow only stopped jobs to be deleted
Hendrik Muhs authored Dec 5, 2018
commit 7cd92084507fd5dd552679029ec621b9424a0a06
Original file line number Diff line number Diff line change
@@ -6,22 +6,29 @@
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteDataFrameJobAction extends Action<AcknowledgedResponse> {
public class DeleteDataFrameJobAction extends Action<DeleteDataFrameJobAction.Response> {

public static final DeleteDataFrameJobAction INSTANCE = new DeleteDataFrameJobAction();
public static final String NAME = "cluster:admin/data_frame/delete";
@@ -31,11 +38,11 @@ private DeleteDataFrameJobAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
private String id;

public Request(String id) {
@@ -54,6 +61,11 @@ public String getId() {
return id;
}

@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@@ -90,10 +102,73 @@ public boolean equals(Object obj) {
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
public static class RequestBuilder extends ActionRequestBuilder<DeleteDataFrameJobAction.Request, DeleteDataFrameJobAction.Response> {

protected RequestBuilder(ElasticsearchClient client, DeleteDataFrameJobAction action) {
super(client, action, new Request());
super(client, action, new DeleteDataFrameJobAction.Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}

public Response() {
this(false, Collections.emptyList(), Collections.emptyList());
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameJobAction.Response response = (DeleteDataFrameJobAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Original file line number Diff line number Diff line change
@@ -5,103 +5,95 @@
*/
package org.elasticsearch.xpack.dataframe.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.dataframe.job.DataFrameJob;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Request;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction.Response;
import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.List;

public class TransportDeleteDataFrameJobAction
extends TransportMasterNodeAction<DeleteDataFrameJobAction.Request, AcknowledgedResponse> {

private final PersistentTasksService persistentTasksService;
private static final Logger logger = LogManager.getLogger(TransportDeleteDataFrameJobAction.class);
public class TransportDeleteDataFrameJobAction extends TransportTasksAction<DataFrameJobTask, Request, Response, Response> {

@Inject
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
PersistentTasksService persistentTasksService, ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteDataFrameJobAction.Request::new);
this.persistentTasksService = persistentTasksService;
public TransportDeleteDataFrameJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
ClusterService clusterService) {
super(DeleteDataFrameJobAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
ThreadPool.Names.SAME);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
assert tasks.size() + taskOperationFailures.size() == 1;
boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted);

return new Response(cancelled, taskOperationFailures, failedNodeExceptions);
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
protected Response readTaskResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
return response;
}

@Override
protected void masterOperation(DeleteDataFrameJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {

String jobId = request.getId();
TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option

// Step 1. Cancel the persistent task
persistentTasksService.sendRemoveRequest(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.debug("Request to cancel Task for data frame job [" + jobId + "] successful.");

// Step 2. Wait for the task to finish cancellation internally
persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
new PersistentTasksService.WaitForPersistentTaskListener<DataFrameJob>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<DataFrameJob> task) {
logger.debug("Task for data frame job [" + jobId + "] successfully canceled.");
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(Exception e) {
logger.error("Error while cancelling task for data frame job [" + jobId
+ "]." + e);
listener.onFailure(e);
}

@Override
public void onTimeout(TimeValue timeout) {
String msg = "Stopping of data frame job [" + jobId + "] timed out after [" + timeout + "].";
logger.warn(msg);
listener.onFailure(new ElasticsearchException(msg));
}
});
}

@Override
public void onFailure(Exception e) {
logger.error("Error while requesting to cancel task for data frame job [" + jobId + "]" + e);
listener.onFailure(e);
}
});

protected void taskOperation(Request request, DataFrameJobTask task, ActionListener<Response> listener) {
assert task.getConfig().getId().equals(request.getId());
IndexerState state = task.getState().getJobState();
if (state.equals(IndexerState.STOPPED)) {
task.onCancelled();
listener.onResponse(new Response(true));
} else {
listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + "indexer state is ["
+ state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion."));
}
}

@Override
protected ClusterBlockException checkBlock(DeleteDataFrameJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {
// If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call,
// no need to go looking for the allocated task
listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist"));
}
} else {
// Delegates DeleteJob to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows jobs which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameJobAction.Response;
@@ -35,16 +36,18 @@ public class DataFrameJobTask extends AllocatedPersistentTask implements Schedul
private static final Logger logger = LogManager.getLogger(DataFrameJobTask.class);

private final DataFrameJob job;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;

static final String SCHEDULE_NAME = "xpack/data_frame/job" + "/schedule";
static final String SCHEDULE_NAME = DataFrame.TASK_NAME + "/schedule";

public DataFrameJobTask(long id, String type, String action, TaskId parentTask, DataFrameJob job,
DataFrameJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool,
Map<String, String> headers) {
super(id, type, action, DataFrameJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + job.getConfig().getId(), parentTask, headers);
this.job = job;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
logger.info("construct job task");
// todo: simplistic implementation for now
@@ -86,6 +89,38 @@ public void triggered(Event event) {
}
}

/**
* Attempt to gracefully cleanup the data frame job so it can be terminated.
* This tries to remove the job from the scheduler, and potentially any other
* cleanup operations in the future
*/
synchronized void shutdown() {
try {
logger.info("Data frame indexer [" + job.getConfig().getId() + "] received abort request, stopping indexer.");
schedulerEngine.remove(SCHEDULE_NAME + "_" + job.getConfig().getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
markAsFailed(e);
return;
}
markAsCompleted();
}

/**
* This is called when the persistent task signals that the allocated task should be terminated.
* Termination in the task framework is essentially voluntary, as the allocated task can only be
* shut down from the inside.
*/
@Override
public synchronized void onCancelled() {
logger.info(
"Received cancellation request for data frame job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]");
if (indexer.abort()) {
// there is no background job running, we can shutdown safely
shutdown();
}
}

protected class ClientDataFrameIndexer extends DataFrameIndexer {
private final Client client;

@@ -138,6 +173,7 @@ protected void onFinish() {
@Override
protected void onAbort() {
logger.info("Data frame job [" + job.getConfig().getId() + "] received abort request, stopping indexer");
shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameJobAction;
@@ -30,7 +31,16 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
String id = restRequest.param(DataFrameJob.ID.getPreferredName());
DeleteDataFrameJobAction.Request request = new DeleteDataFrameJobAction.Request(id);

return channel -> client.execute(DeleteDataFrameJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(DeleteDataFrameJobAction.INSTANCE, request,
new RestToXContentListener<DeleteDataFrameJobAction.Response>(channel) {
@Override
protected RestStatus getStatus(DeleteDataFrameJobAction.Response response) {
if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) {
return RestStatus.INTERNAL_SERVER_ERROR;
}
return RestStatus.OK;
}
});
}

@Override