diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EvaluateDataFrameAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EvaluateDataFrameAction.java index 2277cd2acb08d..d0793053b2d1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EvaluateDataFrameAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EvaluateDataFrameAction.java @@ -16,6 +16,9 @@ import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -30,6 +33,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -175,6 +179,11 @@ public boolean equals(Object o) { && Objects.equals(queryProvider, that.queryProvider) && Objects.equals(evaluation, that.evaluation); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "evaluate_data_frame", parentTaskId, headers); + } } public static class Response extends ActionResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ExplainDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ExplainDataFrameAnalyticsAction.java index 801705e3b0697..04a893918a470 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ExplainDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ExplainDataFrameAnalyticsAction.java @@ -6,21 +6,36 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection; import org.elasticsearch.xpack.core.ml.dataframe.explain.MemoryEstimation; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.MlStrings; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class ExplainDataFrameAnalyticsAction extends ActionType { public static final ExplainDataFrameAnalyticsAction INSTANCE = new ExplainDataFrameAnalyticsAction(); @@ -30,6 +45,118 @@ private ExplainDataFrameAnalyticsAction() { super(NAME, ExplainDataFrameAnalyticsAction.Response::new); } + public static class Request extends AcknowledgedRequest implements ToXContentObject { + public static Request parseRequest(XContentParser parser) { + DataFrameAnalyticsConfig.Builder configBuilder = DataFrameAnalyticsConfig.STRICT_PARSER.apply(parser, null); + DataFrameAnalyticsConfig config = configBuilder.buildForExplain(); + return new Request(config); + } + + private final DataFrameAnalyticsConfig config; + + public Request(StreamInput in) throws IOException { + super(in); + config = new DataFrameAnalyticsConfig(in); + } + + public Request(DataFrameAnalyticsConfig config) { + this.config = config; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + config.writeTo(out); + } + + public DataFrameAnalyticsConfig getConfig() { + return config; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException error = null; + error = checkConfigIdIsValid(config, error); + error = SourceDestValidator.validateRequest(error, config.getDest().getIndex()); + error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error); + return error; + } + + private ActionRequestValidationException checkConfigIdIsValid( + DataFrameAnalyticsConfig analyticsConfig, + ActionRequestValidationException error + ) { + if (MlStrings.isValidId(analyticsConfig.getId()) == false) { + error = ValidateActions.addValidationError( + Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID, analyticsConfig.getId()), + error + ); + } + if (MlStrings.hasValidLengthForId(analyticsConfig.getId()) == false) { + error = ValidateActions.addValidationError( + Messages.getMessage( + Messages.ID_TOO_LONG, + DataFrameAnalyticsConfig.ID, + analyticsConfig.getId(), + MlStrings.ID_LENGTH_LIMIT + ), + error + ); + } + return error; + } + + private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering( + DataFrameAnalyticsConfig analyticsConfig, + ActionRequestValidationException error + ) { + if (analyticsConfig.getAnalyzedFields() == null) { + return error; + } + for (String analyzedInclude : analyticsConfig.getAnalyzedFields().includes()) { + if (analyticsConfig.getSource().isFieldExcluded(analyzedInclude)) { + return ValidateActions.addValidationError( + "field [" + + analyzedInclude + + "] is included in [" + + DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName() + + "] but not in [" + + DataFrameAnalyticsConfig.SOURCE.getPreferredName() + + "." + + DataFrameAnalyticsSource._SOURCE.getPreferredName() + + "]", + error + ); + } + } + return error; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + config.toXContent(builder, params); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(config, request.config); + } + + @Override + public int hashCode() { + return Objects.hash(config); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("explain_data_frame_analytics[%s]", config.getId()), parentTaskId, headers); + } + } + public static class Response extends ActionResponse implements ToXContentObject { public static final ParseField TYPE = new ParseField("explain_data_frame_analytics_response"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java index ccb159c53e315..1847f8f74ed75 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java @@ -11,6 +11,9 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -24,9 +27,11 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.core.Strings.format; public class GetCategoriesAction extends ActionType { @@ -190,6 +195,11 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(jobId, categoryId, pageParams, partitionFieldValue); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("get_categories[%s:%s]", jobId, categoryId), parentTaskId, headers); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsAction.java index f142c3c3651f8..88de64553c5f5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsAction.java @@ -11,6 +11,9 @@ import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -18,8 +21,11 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class GetDatafeedsAction extends ActionType { public static final GetDatafeedsAction INSTANCE = new GetDatafeedsAction(); @@ -93,6 +99,11 @@ public boolean equals(Object obj) { Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoMatch, other.allowNoMatch); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("get_datafeeds[%s]", datafeedId), parentTaskId, headers); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsAction.java index 486e18158c34b..fd1df69dad73d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsAction.java @@ -11,6 +11,9 @@ import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -18,8 +21,11 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class GetJobsAction extends ActionType { public static final GetJobsAction INSTANCE = new GetJobsAction(); @@ -91,6 +97,11 @@ public boolean equals(Object obj) { Request other = (Request) obj; return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoMatch, other.allowNoMatch); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("get_anomaly_detection_jobs[%s]", jobId), parentTaskId, headers); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetModelSnapshotsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetModelSnapshotsAction.java index 8ef588ffa08d0..edf0a7f7e7e65 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetModelSnapshotsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetModelSnapshotsAction.java @@ -12,6 +12,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -25,8 +28,11 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class GetModelSnapshotsAction extends ActionType { public static final GetModelSnapshotsAction INSTANCE = new GetModelSnapshotsAction(); @@ -205,6 +211,11 @@ public boolean equals(Object obj) { && Objects.equals(sort, other.sort) && Objects.equals(desc, other.desc); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("get_job_model_snapshot[%s:%s]", jobId, snapshotId), parentTaskId, headers); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java index 5c6775dadbb4a..ad634ff8d4b82 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java @@ -12,6 +12,9 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -26,6 +29,8 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class PreviewDataFrameAnalyticsAction extends ActionType { public static final PreviewDataFrameAnalyticsAction INSTANCE = new PreviewDataFrameAnalyticsAction(); @@ -87,6 +92,11 @@ public int hashCode() { return Objects.hash(config); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("preview_data_frame_analytics[%s]", config.getId()), parentTaskId, headers); + } + public static class Builder { private DataFrameAnalyticsConfig config; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java index 9adaf90ff7eb2..88454b1500e8c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java @@ -17,6 +17,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -29,9 +32,11 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams.parseDateOrThrow; import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.END_TIME; import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.START_TIME; @@ -199,6 +204,11 @@ public boolean equals(Object obj) { && Objects.equals(jobConfig, other.jobConfig); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, format("preview_datafeed[%s]", datafeedId), parentTaskId, headers); + } + public static class Builder { private String datafeedId; private DatafeedConfig.Builder datafeedBuilder; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java index dbc1f27b44ae3..ebe8f74829997 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java @@ -54,19 +54,7 @@ public static Request parseRequest(String id, XContentParser parser) { return new PutDataFrameAnalyticsAction.Request(config.build()); } - /** - * Parses request for use in the explain action. - * {@link Request} is reused across {@link PutDataFrameAnalyticsAction} and - * {@link ExplainDataFrameAnalyticsAction} but parsing differs - * between these two usages. - */ - public static Request parseRequestForExplain(XContentParser parser) { - DataFrameAnalyticsConfig.Builder configBuilder = DataFrameAnalyticsConfig.STRICT_PARSER.apply(parser, null); - DataFrameAnalyticsConfig config = configBuilder.buildForExplain(); - return new PutDataFrameAnalyticsAction.Request(config); - } - - private DataFrameAnalyticsConfig config; + private final DataFrameAnalyticsConfig config; public Request(StreamInput in) throws IOException { super(in); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index ff5fd2e54aade..42bad85c71c4f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; @@ -209,7 +208,7 @@ public void testSimultaneousExplainSameConfig() throws IOException { List> futures = new ArrayList<>(); for (int i = 0; i < simultaneousInvocationCount; ++i) { - futures.add(client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, new PutDataFrameAnalyticsAction.Request(config))); + futures.add(client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, new ExplainDataFrameAnalyticsAction.Request(config))); } ExplainDataFrameAnalyticsAction.Response previous = null; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 90fb4e81b7364..5d497b24d143e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -124,7 +124,7 @@ protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(Stri } protected ExplainDataFrameAnalyticsAction.Response explainDataFrame(DataFrameAnalyticsConfig config) { - PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(config); + ExplainDataFrameAnalyticsAction.Request request = new ExplainDataFrameAnalyticsAction.Request(config); return client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, request).actionGet(); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index d99aa96656074..f31f8df53247b 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -281,7 +281,7 @@ public void testProcessResults() throws Exception { // 1. one related to creating model snapshot // 2. one for {@link Annotation} result List annotations = getAnnotations(); - assertThat("Annotations were: " + annotations.toString(), annotations, hasSize(2)); + assertThat("Annotations were: " + annotations, annotations, hasSize(2)); assertThat( annotations.stream().map(Annotation::getAnnotation).collect(toList()), containsInAnyOrder("Job model snapshot with id [" + modelSnapshot.getSnapshotId() + "] stored", annotation.getAnnotation()) @@ -664,6 +664,8 @@ private QueryPage getCategoryDefinition(Long categoryId, Str errorHolder.set(e); latch.countDown(); }, + null, + null, client() ); latch.await(); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 477d827b3417a..6d1155b1b8b64 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -94,7 +94,7 @@ public void testCrud() throws InterruptedException { // Read datafeed config AtomicReference configBuilderHolder = new AtomicReference<>(); blockingCall( - actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, actionListener), + actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, null, actionListener), configBuilderHolder, exceptionHolder ); @@ -130,7 +130,7 @@ public void testCrud() throws InterruptedException { // Read the updated config configBuilderHolder.set(null); blockingCall( - actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, actionListener), + actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, null, actionListener), configBuilderHolder, exceptionHolder ); @@ -153,7 +153,7 @@ public void testGetDatafeedConfig_missing() throws InterruptedException { AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference configBuilderHolder = new AtomicReference<>(); blockingCall( - actionListener -> datafeedConfigProvider.getDatafeedConfig("missing", actionListener), + actionListener -> datafeedConfigProvider.getDatafeedConfig("missing", null, actionListener), configBuilderHolder, exceptionHolder ); @@ -487,14 +487,14 @@ public void testFindDatafeedsForJobIds() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall( - actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Collections.singletonList("new-job"), actionListener), + actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Collections.singletonList("new-job"), null, actionListener), datafeedMapHolder, exceptionHolder ); assertThat(datafeedMapHolder.get(), anEmptyMap()); blockingCall( - actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Collections.singletonList("j2"), actionListener), + actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Collections.singletonList("j2"), null, actionListener), datafeedMapHolder, exceptionHolder ); @@ -502,7 +502,7 @@ public void testFindDatafeedsForJobIds() throws Exception { assertThat(datafeedMapHolder.get().get("j2").getId(), equalTo("foo-2")); blockingCall( - actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Arrays.asList("j3", "j1"), actionListener), + actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(Arrays.asList("j3", "j1"), null, actionListener), datafeedMapHolder, exceptionHolder ); @@ -527,7 +527,7 @@ public void testFindDatafeedsForJobIds_ManyJobs() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall( - actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(jobIds, actionListener), + actionListener -> datafeedConfigProvider.findDatafeedsByJobIds(jobIds, null, actionListener), datafeedMapHolder, exceptionHolder ); @@ -546,7 +546,7 @@ public void testHeadersAreOverwritten() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference configBuilderHolder = new AtomicReference<>(); blockingCall( - actionListener -> datafeedConfigProvider.getDatafeedConfig(dfId, actionListener), + actionListener -> datafeedConfigProvider.getDatafeedConfig(dfId, null, actionListener), configBuilderHolder, exceptionHolder ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 3099b1e0d8c50..2355904fb6910 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -73,7 +73,7 @@ public void testGetMissingJob() throws InterruptedException { AtomicReference jobHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.getJob("missing", actionListener), jobHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.getJob("missing", null, actionListener), jobHolder, exceptionHolder); assertNull(jobHolder.get()); assertNotNull(exceptionHolder.get()); @@ -86,7 +86,7 @@ public void testCheckJobExists() throws InterruptedException { boolean throwIfMissing = randomBoolean(); blockingCall( - actionListener -> jobConfigProvider.jobExists("missing", throwIfMissing, actionListener), + actionListener -> jobConfigProvider.jobExists("missing", throwIfMissing, null, actionListener), jobExistsHolder, exceptionHolder ); @@ -108,7 +108,7 @@ public void testCheckJobExists() throws InterruptedException { exceptionHolder.set(null); blockingCall( - actionListener -> jobConfigProvider.jobExists("existing-job", throwIfMissing, actionListener), + actionListener -> jobConfigProvider.jobExists("existing-job", throwIfMissing, null, actionListener), jobExistsHolder, exceptionHolder ); @@ -152,7 +152,7 @@ public void testCrud() throws InterruptedException { // Read Job AtomicReference getJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, null, actionListener), getJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals(newJob, getJobResponseHolder.get().build()); @@ -170,7 +170,7 @@ public void testCrud() throws InterruptedException { assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); getJobResponseHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, null, actionListener), getJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals("This job has been updated", getJobResponseHolder.get().build().getDescription()); @@ -182,7 +182,7 @@ public void testCrud() throws InterruptedException { // Read deleted job getJobResponseHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, null, actionListener), getJobResponseHolder, exceptionHolder); assertNull(getJobResponseHolder.get()); assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 306a383d2ea0c..84677093ae588 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -799,7 +799,7 @@ public void testGetSnapshots() { .get(); PlainActionFuture> future = new PlainActionFuture<>(); - jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", future::onResponse, future::onFailure); + jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", null, future::onResponse, future::onFailure); List snapshots = future.actionGet().results(); assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2")); assertNull(snapshots.get(0).getQuantiles()); @@ -807,7 +807,7 @@ public void testGetSnapshots() { assertNull(snapshots.get(1).getQuantiles()); future = new PlainActionFuture<>(); - jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*", future::onResponse, future::onFailure); + jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*", null, future::onResponse, future::onFailure); snapshots = future.actionGet().results(); assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2")); assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1")); @@ -815,21 +815,21 @@ public void testGetSnapshots() { assertNull(snapshots.get(1).getQuantiles()); future = new PlainActionFuture<>(); - jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*,other_snap", future::onResponse, future::onFailure); + jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*,other_snap", null, future::onResponse, future::onFailure); snapshots = future.actionGet().results(); assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2")); assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1")); assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap")); future = new PlainActionFuture<>(); - jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "*", future::onResponse, future::onFailure); + jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "*", null, future::onResponse, future::onFailure); snapshots = future.actionGet().results(); assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2")); assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1")); assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap")); future = new PlainActionFuture<>(); - jobProvider.modelSnapshots("*", 0, 5, null, null, "min_version", false, null, future::onResponse, future::onFailure); + jobProvider.modelSnapshots("*", 0, 5, null, null, "min_version", false, null, null, future::onResponse, future::onFailure); snapshots = future.actionGet().results(); assertThat(snapshots.get(0).getSnapshotId(), equalTo("11")); assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1")); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java index a0f72d738642e..39d7a2577aeb8 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java @@ -80,7 +80,7 @@ public void testAutomaticModelUpdate() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall( - listener -> datafeedConfigProvider.getDatafeedConfig("farequote-datafeed-with-old-agg", listener), + listener -> datafeedConfigProvider.getDatafeedConfig("farequote-datafeed-with-old-agg", null, listener), getConfigHolder, exceptionHolder ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 6236bffe9b9a1..5050c681eca71 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -204,7 +204,7 @@ protected void masterOperation( // First check that the job exists, because we don't want to audit // the beginning of its deletion if it didn't exist in the first place - jobConfigProvider.jobExists(request.getJobId(), true, jobExistsListener); + jobConfigProvider.jobExists(request.getJobId(), true, null, jobExistsListener); } private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) { @@ -380,6 +380,6 @@ private void cancelResetTaskIfExists(String jobId, ActionListener liste } }, listener::onFailure); - jobConfigProvider.getJob(jobId, jobListener); + jobConfigProvider.getJob(jobId, null, jobListener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java index c392b070e096c..20aacd72c904f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java @@ -58,7 +58,7 @@ public TransportDeleteModelSnapshotAction( @Override protected void doExecute(Task task, DeleteModelSnapshotAction.Request request, ActionListener listener) { // Verify the snapshot exists - jobResultsProvider.modelSnapshots(request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(), page -> { + jobResultsProvider.modelSnapshots(request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(), null, page -> { List deleteCandidates = page.results(); if (deleteCandidates.size() > 1) { logger.warn( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java index cbe2529aaa36b..1e12eb776635b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java @@ -12,11 +12,13 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; @@ -40,6 +42,7 @@ public class TransportEvaluateDataFrameAction extends HandledTransportAction< private final Client client; private final AtomicReference maxBuckets = new AtomicReference<>(); private final SecurityContext securityContext; + private final ClusterService clusterService; @Inject public TransportEvaluateDataFrameAction( @@ -58,6 +61,7 @@ public TransportEvaluateDataFrameAction( : null; this.maxBuckets.set(MAX_BUCKET_SETTING.get(clusterService.getSettings())); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBuckets); + this.clusterService = clusterService; } private void setMaxBuckets(int maxBuckets) { @@ -70,6 +74,7 @@ protected void doExecute( EvaluateDataFrameAction.Request request, ActionListener listener ) { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); ActionListener> resultsListener = ActionListener.wrap(unused -> { EvaluateDataFrameAction.Response response = new EvaluateDataFrameAction.Response( request.getEvaluation().getName(), @@ -80,7 +85,13 @@ protected void doExecute( // Create an immutable collection of parameters to be used by evaluation metrics. EvaluationParameters parameters = new EvaluationParameters(maxBuckets.get()); - EvaluationExecutor evaluationExecutor = new EvaluationExecutor(threadPool, client, parameters, request, securityContext); + EvaluationExecutor evaluationExecutor = new EvaluationExecutor( + threadPool, + new ParentTaskAssigningClient(client, parentTaskId), + parameters, + request, + securityContext + ); evaluationExecutor.execute(resultsListener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java index e4dfd64fd8dea..ea0f37896b3c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -32,7 +33,6 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection; import org.elasticsearch.xpack.core.ml.dataframe.explain.MemoryEstimation; @@ -56,7 +56,7 @@ * Redirects to a different node if the current node is *not* an ML node. */ public class TransportExplainDataFrameAnalyticsAction extends HandledTransportAction< - PutDataFrameAnalyticsAction.Request, + ExplainDataFrameAnalyticsAction.Request, ExplainDataFrameAnalyticsAction.Response> { private static final Logger logger = LogManager.getLogger(TransportExplainDataFrameAnalyticsAction.class); @@ -80,7 +80,7 @@ public TransportExplainDataFrameAnalyticsAction( Settings settings, ThreadPool threadPool ) { - super(ExplainDataFrameAnalyticsAction.NAME, transportService, actionFilters, PutDataFrameAnalyticsAction.Request::new); + super(ExplainDataFrameAnalyticsAction.NAME, transportService, actionFilters, ExplainDataFrameAnalyticsAction.Request::new); this.transportService = transportService; this.clusterService = Objects.requireNonNull(clusterService); this.client = Objects.requireNonNull(client); @@ -96,7 +96,7 @@ public TransportExplainDataFrameAnalyticsAction( @Override protected void doExecute( Task task, - PutDataFrameAnalyticsAction.Request request, + ExplainDataFrameAnalyticsAction.Request request, ActionListener listener ) { if (MachineLearningField.ML_API_FEATURE.check(licenseState) == false) { @@ -122,12 +122,12 @@ protected void doExecute( private void explain( Task task, - PutDataFrameAnalyticsAction.Request request, + ExplainDataFrameAnalyticsAction.Request request, ActionListener listener ) { - + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory( - new ParentTaskAssigningClient(client, task.getParentTaskId()) + new ParentTaskAssigningClient(client, parentTaskId) ); if (XPackSettings.SECURITY_ENABLED.get(settings)) { useSecondaryAuthIfAvailable(this.securityContext, () -> { @@ -139,7 +139,7 @@ private void explain( extractedFieldsDetectorFactory.createFromSource( config, ActionListener.wrap( - extractedFieldsDetector -> explain(task, config, extractedFieldsDetector, listener), + extractedFieldsDetector -> explain(parentTaskId, config, extractedFieldsDetector, listener), listener::onFailure ) ); @@ -148,7 +148,7 @@ private void explain( extractedFieldsDetectorFactory.createFromSource( request.getConfig(), ActionListener.wrap( - extractedFieldsDetector -> explain(task, request.getConfig(), extractedFieldsDetector, listener), + extractedFieldsDetector -> explain(parentTaskId, request.getConfig(), extractedFieldsDetector, listener), listener::onFailure ) ); @@ -156,7 +156,7 @@ private void explain( } private void explain( - Task task, + TaskId parentTaskId, DataFrameAnalyticsConfig config, ExtractedFieldsDetector extractedFieldsDetector, ActionListener listener @@ -177,7 +177,7 @@ private void explain( listener::onFailure ); - estimateMemoryUsage(task, config, fieldExtraction.v1(), memoryEstimationListener); + estimateMemoryUsage(parentTaskId, config, fieldExtraction.v1(), memoryEstimationListener); } /** @@ -186,14 +186,14 @@ private void explain( * only available on nodes where the ML plugin is enabled. */ private void estimateMemoryUsage( - Task task, + TaskId parentTaskId, DataFrameAnalyticsConfig config, ExtractedFields extractedFields, ActionListener listener ) { - final String estimateMemoryTaskId = "memory_usage_estimation_" + task.getId(); + final String estimateMemoryTaskId = "memory_usage_estimation_" + parentTaskId.getId(); DataFrameDataExtractorFactory extractorFactory = DataFrameDataExtractorFactory.createForSourceIndices( - new ParentTaskAssigningClient(client, task.getParentTaskId()), + new ParentTaskAssigningClient(client, parentTaskId), estimateMemoryTaskId, config, extractedFields @@ -216,7 +216,7 @@ private void estimateMemoryUsage( * estimation process on, and redirect the request to this node. */ private void redirectToSuitableNode( - PutDataFrameAnalyticsAction.Request request, + ExplainDataFrameAnalyticsAction.Request request, ActionListener listener ) { Optional node = findSuitableNode(clusterService.state()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java index 73dc26adcff02..3ab72691e8266 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java @@ -40,7 +40,7 @@ public TransportGetBucketsAction( @Override protected void doExecute(Task task, GetBucketsAction.Request request, ActionListener listener) { - jobManager.jobExists(request.getJobId(), ActionListener.wrap(ok -> { + jobManager.jobExists(request.getJobId(), null, ActionListener.wrap(ok -> { BucketsQueryBuilder query = new BucketsQueryBuilder().expand(request.isExpand()) .includeInterim(request.isExcludeInterim() == false) .start(request.getStart()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java index 116bde09cab6f..44bd186a7d543 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java @@ -59,13 +59,13 @@ protected void doExecute( .calendarIds(calendarId); ActionListener> eventsListener = ActionListener.wrap( - events -> { listener.onResponse(new GetCalendarEventsAction.Response(events)); }, + events -> listener.onResponse(new GetCalendarEventsAction.Response(events)), listener::onFailure ); if (request.getJobId() != null) { - jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(jobBuilder -> { + jobConfigProvider.getJob(request.getJobId(), null, ActionListener.wrap(jobBuilder -> { Job job = jobBuilder.build(); jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java index 68e8594b64b65..08b220ae513a8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java @@ -10,8 +10,12 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.ml.job.JobManager; @@ -22,6 +26,7 @@ public class TransportGetCategoriesAction extends HandledTransportAction listener) { - jobManager.jobExists(request.getJobId(), ActionListener.wrap(jobExists -> { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); + jobManager.jobExists(request.getJobId(), parentTaskId, ActionListener.wrap(jobExists -> { Integer from = request.getPageParams() != null ? request.getPageParams().getFrom() : null; Integer size = request.getPageParams() != null ? request.getPageParams().getSize() : null; jobResultsProvider.categoryDefinitions( @@ -51,7 +59,9 @@ protected void doExecute(Task task, GetCategoriesAction.Request request, ActionL size, r -> listener.onResponse(new GetCategoriesAction.Response(r)), listener::onFailure, - client + (CancellableTask) task, + parentTaskId, + new ParentTaskAssigningClient(client, parentTaskId) ); }, listener::onFailure)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index 79259a705754a..e218e0ae31a04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; @@ -60,10 +61,12 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); logger.debug("Get datafeed '{}'", request.getDatafeedId()); datafeedManager.getDatafeeds( request, + parentTaskId, ActionListener.wrap(datafeeds -> listener.onResponse(new GetDatafeedsAction.Response(datafeeds)), listener::onFailure) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java index eedae7c258480..4bdb227ddeaec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java @@ -40,7 +40,7 @@ public TransportGetInfluencersAction( @Override protected void doExecute(Task task, GetInfluencersAction.Request request, ActionListener listener) { - jobManager.jobExists(request.getJobId(), ActionListener.wrap(jobExists -> { + jobManager.jobExists(request.getJobId(), null, ActionListener.wrap(jobExists -> { InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder().includeInterim( request.isExcludeInterim() == false ) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsAction.java index da432651df4d8..2133177902d3a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -68,13 +69,16 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); logger.debug("Get job '{}'", request.getJobId()); jobManager.expandJobBuilders( request.getJobId(), request.allowNoMatch(), + parentTaskId, ActionListener.wrap( jobs -> datafeedManager.getDatafeedsByJobIds( jobs.stream().map(Job.Builder::getId).collect(Collectors.toSet()), + parentTaskId, ActionListener.wrap( dfsByJobId -> listener.onResponse(new GetJobsAction.Response(new QueryPage<>(jobs.stream().map(jb -> { Optional.ofNullable(dfsByJobId.get(jb.getId())).ifPresent(jb::setDatafeed); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java index 9e714f256c584..f5d240d6d0ef3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java @@ -11,9 +11,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.job.JobManager; @@ -29,17 +31,20 @@ public class TransportGetModelSnapshotsAction extends HandledTransportAction< private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; + private final ClusterService clusterService; @Inject public TransportGetModelSnapshotsAction( TransportService transportService, ActionFilters actionFilters, JobResultsProvider jobResultsProvider, - JobManager jobManager + JobManager jobManager, + ClusterService clusterService ) { super(GetModelSnapshotsAction.NAME, transportService, actionFilters, GetModelSnapshotsAction.Request::new); this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; + this.clusterService = clusterService; } @Override @@ -48,6 +53,7 @@ protected void doExecute( GetModelSnapshotsAction.Request request, ActionListener listener ) { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); logger.debug( () -> format( "Get model snapshots for job %s snapshot ID %s. from = %s, size = %s start = '%s', end='%s', sort=%s descending=%s", @@ -63,13 +69,21 @@ protected void doExecute( ); if (Strings.isAllOrWildcard(request.getJobId())) { - getModelSnapshots(request, listener); + getModelSnapshots(request, parentTaskId, listener); return; } - jobManager.jobExists(request.getJobId(), ActionListener.wrap(ok -> getModelSnapshots(request, listener), listener::onFailure)); + jobManager.jobExists( + request.getJobId(), + parentTaskId, + ActionListener.wrap(ok -> getModelSnapshots(request, parentTaskId, listener), listener::onFailure) + ); } - private void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionListener listener) { + private void getModelSnapshots( + GetModelSnapshotsAction.Request request, + TaskId parentTaskId, + ActionListener listener + ) { jobResultsProvider.modelSnapshots( request.getJobId(), request.getPageParams().getFrom(), @@ -79,6 +93,7 @@ private void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionLi request.getSort(), request.getDescOrder(), request.getSnapshotId(), + parentTaskId, page -> listener.onResponse(new GetModelSnapshotsAction.Response(page)), listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java index ebc51460517d3..1076a8cfe6233 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java @@ -41,7 +41,7 @@ public TransportGetRecordsAction( @Override protected void doExecute(Task task, GetRecordsAction.Request request, ActionListener listener) { - jobManager.jobExists(request.getJobId(), ActionListener.wrap(jobExists -> { + jobManager.jobExists(request.getJobId(), null, ActionListener.wrap(jobExists -> { RecordsQueryBuilder query = new RecordsQueryBuilder().includeInterim(request.isExcludeInterim() == false) .epochStart(request.getStart()) .epochEnd(request.getEnd()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index fc394602f42db..eadea7f083592 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -216,7 +216,7 @@ public void onFailure(Exception e) { }, listener::onFailure); // Get the job config - jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap(builder -> { + jobConfigProvider.getJob(jobParams.getJobId(), null, ActionListener.wrap(builder -> { jobParams.setJob(builder.build()); getJobHandler.onResponse(null); }, listener::onFailure)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java index 80cd62b4831e4..beb3f07318978 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -101,13 +102,14 @@ protected void doExecute(Task task, Request request, ActionListener li } void preview(Task task, DataFrameAnalyticsConfig config, ActionListener listener) { + final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory( - new ParentTaskAssigningClient(client, task.getParentTaskId()) + new ParentTaskAssigningClient(client, parentTaskId) ); extractedFieldsDetectorFactory.createFromSource(config, ActionListener.wrap(extractedFieldsDetector -> { DataFrameDataExtractor extractor = DataFrameDataExtractorFactory.createForSourceIndices( client, - task.getParentTaskId().toString(), + parentTaskId.toString(), config, extractedFieldsDetector.detect().v1() ).newExtractor(false); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 36b5dee2ea351..121791c5cfc33 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -91,15 +92,17 @@ public TransportPreviewDatafeedAction( @Override protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener listener) { + TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId()); ActionListener datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> { if (request.getJobConfig() != null) { - previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), request, listener); + previewDatafeed(parentTaskId, datafeedConfig, request.getJobConfig().build(new Date()), request, listener); return; } jobConfigProvider.getJob( datafeedConfig.getJobId(), + parentTaskId, ActionListener.wrap( - jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), request, listener), + jobBuilder -> previewDatafeed(parentTaskId, datafeedConfig, jobBuilder.build(), request, listener), listener::onFailure ) ); @@ -109,13 +112,14 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio } else { datafeedConfigProvider.getDatafeedConfig( request.getDatafeedId(), + parentTaskId, ActionListener.wrap(builder -> datafeedConfigActionListener.onResponse(builder.build()), listener::onFailure) ); } } private void previewDatafeed( - Task task, + TaskId parentTaskId, DatafeedConfig datafeedConfig, Job job, PreviewDatafeedAction.Request request, @@ -131,7 +135,7 @@ private void previewDatafeed( // requesting the preview doesn't have permission to search the relevant indices. DatafeedConfig previewDatafeedConfig = previewDatafeedBuilder.build(); DataExtractorFactory.create( - new ParentTaskAssigningClient(client, clusterService.localNode(), task), + new ParentTaskAssigningClient(client, parentTaskId), previewDatafeedConfig, job, xContentRegistry, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java index 5c15bdce85311..5f176c2568ed8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java @@ -135,7 +135,7 @@ protected void masterOperation( } }, listener::onFailure); - jobConfigProvider.getJob(request.getJobId(), jobListener); + jobConfigProvider.getJob(request.getJobId(), null, jobListener); } private void waitExistingResetTaskToComplete( @@ -190,7 +190,7 @@ private void resetIfJobIsStillBlockedOnReset(Task task, ResetJobAction.Request r }, listener::onFailure); // Get job again to check if it is still blocked - jobConfigProvider.getJob(request.getJobId(), jobListener); + jobConfigProvider.getJob(request.getJobId(), null, jobListener); } private void resetJob( @@ -236,7 +236,7 @@ private void resetJob( listener.onResponse(AcknowledgedResponse.of(false)); return; } - jobConfigProvider.getJob(jobId, ActionListener.wrap(jobBuilder -> { + jobConfigProvider.getJob(jobId, null, ActionListener.wrap(jobBuilder -> { if (task.isCancelled()) { listener.onResponse(AcknowledgedResponse.of(false)); return; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 295a7c885f9c5..2f6b13934a63e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -176,7 +176,7 @@ protected void masterOperation( // 2. Verify the job exists ActionListener createStateIndexListener = ActionListener.wrap( - r -> jobManager.jobExists(jobId, jobExistsListener), + r -> jobManager.jobExists(jobId, null, jobExistsListener), listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 7907f42cdf302..eebbcaf0b6ebe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -54,7 +54,6 @@ import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; -import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction.TaskParams; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -250,7 +249,7 @@ private void estimateMemoryUsageAndUpdateMemoryTracker(StartContext startContext ); }, listener::onFailure); - PutDataFrameAnalyticsAction.Request explainRequest = new PutDataFrameAnalyticsAction.Request(startContext.config); + ExplainDataFrameAnalyticsAction.Request explainRequest = new ExplainDataFrameAnalyticsAction.Request(startContext.config); ClientHelper.executeAsyncWithOrigin( client, ClientHelper.ML_ORIGIN, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 6c8d5ab8d6e01..bf659620bf815 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -304,13 +304,13 @@ public void onFailure(Exception e) { return; } } - jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); + jobConfigProvider.getJob(datafeedConfig.getJobId(), null, jobListener); } catch (Exception e) { listener.onFailure(e); } }, listener::onFailure); - datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), datafeedListener); + datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), null, datafeedListener); } static void checkRemoteClusterVersions( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java index 0dcd13b2bad0b..3233637653359 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java @@ -228,6 +228,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A // Get the job config to verify it exists jobConfigProvider.getJob( request.getJobId(), + null, ActionListener.wrap(builder -> getJobHandler.onResponse(builder.build()), listener::onFailure) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContextProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContextProvider.java index e7afd943e9e16..defbc01a89d48 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContextProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContextProvider.java @@ -56,9 +56,9 @@ public void buildDatafeedContext(String datafeedId, ActionListener datafeedListener = ActionListener.wrap(datafeedConfigBuilder -> { DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); context.setDatafeedConfig(datafeedConfig); - jobConfigProvider.getJob(datafeedConfig.getJobId(), jobConfigListener); + jobConfigProvider.getJob(datafeedConfig.getJobId(), null, jobConfigListener); }, listener::onFailure); - datafeedConfigProvider.getDatafeedConfig(datafeedId, datafeedListener); + datafeedConfigProvider.getDatafeedConfig(datafeedId, null, datafeedListener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 8cb22a33fa926..4c6b9a78e3068 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -17,10 +17,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentBuilder; @@ -154,27 +156,34 @@ public void putDatafeed( } } - public void getDatafeeds(GetDatafeedsAction.Request request, ActionListener> listener) { - + public void getDatafeeds( + GetDatafeedsAction.Request request, + @Nullable TaskId parentTaskId, + ActionListener> listener + ) { datafeedConfigProvider.expandDatafeedConfigs( request.getDatafeedId(), request.allowNoMatch(), - null, - ActionListener.wrap(datafeedBuilders -> - - // Build datafeeds - listener.onResponse( - new QueryPage<>( - datafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()), - datafeedBuilders.size(), - DatafeedConfig.RESULTS_FIELD - ) - ), listener::onFailure) + parentTaskId, + ActionListener.wrap( + datafeedBuilders -> listener.onResponse( + new QueryPage<>( + datafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()), + datafeedBuilders.size(), + DatafeedConfig.RESULTS_FIELD + ) + ), + listener::onFailure + ) ); } - public void getDatafeedsByJobIds(Set jobIds, ActionListener> listener) { - datafeedConfigProvider.findDatafeedsByJobIds(jobIds, listener); + public void getDatafeedsByJobIds( + Set jobIds, + @Nullable TaskId parentTaskId, + ActionListener> listener + ) { + datafeedConfigProvider.findDatafeedsByJobIds(jobIds, parentTaskId, listener); } public void updateDatafeed( @@ -236,7 +245,7 @@ public void deleteDatafeed(DeleteDatafeedAction.Request request, ClusterState st String datafeedId = request.getDatafeedId(); - datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap(datafeedConfigBuilder -> { + datafeedConfigProvider.getDatafeedConfig(datafeedId, null, ActionListener.wrap(datafeedConfigBuilder -> { String jobId = datafeedConfigBuilder.build().getJobId(); JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId); jobDataDeleter.deleteDatafeedTimingStats( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 1ce5c19499bac..25572bf131f2e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -164,10 +164,18 @@ public void putDatafeedConfig( * error. * * @param datafeedId The datafeed ID + * @param parentTaskId the parent task ID if available * @param datafeedConfigListener The config listener */ - public void getDatafeedConfig(String datafeedId, ActionListener datafeedConfigListener) { + public void getDatafeedConfig( + String datafeedId, + @Nullable TaskId parentTaskId, + ActionListener datafeedConfigListener + ) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DatafeedConfig.documentId(datafeedId)); + if (parentTaskId != null) { + getRequest.setParentTask(parentTaskId); + } executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { @Override public void onResponse(GetResponse getResponse) { @@ -231,11 +239,18 @@ public void findDatafeedIdsForJobIds(Collection jobIds, ActionListener jobIds, ActionListener> listener) { + public void findDatafeedsByJobIds( + Collection jobIds, + @Nullable TaskId parentTaskId, + ActionListener> listener + ) { SearchRequest searchRequest = client.prepareSearch(MlConfigIndex.indexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(new SearchSourceBuilder().query(buildDatafeedJobIdsQuery(jobIds)).size(jobIds.size())) .request(); + if (parentTaskId != null) { + searchRequest.setParentTask(parentTaskId); + } executeAsyncWithOrigin( client.threadPool().getThreadContext(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 3b7a955c3748a..42734e0e5d8f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -23,8 +23,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -128,8 +130,8 @@ public JobManager( this.maxModelMemoryLimitSupplier = Objects.requireNonNull(maxModelMemoryLimitSupplier); } - public void jobExists(String jobId, ActionListener listener) { - jobConfigProvider.jobExists(jobId, true, listener); + public void jobExists(String jobId, @Nullable TaskId parentTaskId, ActionListener listener) { + jobConfigProvider.jobExists(jobId, true, parentTaskId, listener); } /** @@ -142,6 +144,7 @@ public void jobExists(String jobId, ActionListener listener) { public void getJob(String jobId, ActionListener jobListener) { jobConfigProvider.getJob( jobId, + null, ActionListener.wrap( r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here jobListener::onFailure @@ -155,10 +158,16 @@ public void getJob(String jobId, ActionListener jobListener) { * * @param expression the jobId or an expression matching jobIds * @param allowNoMatch if {@code false}, an error is thrown when no job matches the {@code jobId} + * @param parentTaskId The parent task ID if available * @param jobsListener The jobs listener */ - public void expandJobBuilders(String expression, boolean allowNoMatch, ActionListener> jobsListener) { - jobConfigProvider.expandJobs(expression, allowNoMatch, false, null, jobsListener); + public void expandJobBuilders( + String expression, + boolean allowNoMatch, + @Nullable TaskId parentTaskId, + ActionListener> jobsListener + ) { + jobConfigProvider.expandJobs(expression, allowNoMatch, false, parentTaskId, jobsListener); } /** @@ -173,6 +182,7 @@ public void expandJobs(String expression, boolean allowNoMatch, ActionListener jobsListener.onResponse( new QueryPage<>( @@ -316,7 +326,7 @@ public void onFailure(Exception e) { actionListener::onFailure ); - jobConfigProvider.jobExists(job.getId(), false, ActionListener.wrap(jobExists -> { + jobConfigProvider.jobExists(job.getId(), false, null, ActionListener.wrap(jobExists -> { if (jobExists) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 10e6c43c4033a..44e8353f6143f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -150,11 +150,14 @@ public void putJob(Job job, ActionListener listener) { * error. * * @param jobId The job ID + * @param parentTaskId the parent task ID if available * @param jobListener Job listener */ - public void getJob(String jobId, ActionListener jobListener) { + public void getJob(String jobId, @Nullable TaskId parentTaskId, ActionListener jobListener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); - + if (parentTaskId != null) { + getRequest.setParentTask(parentTaskId); + } executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getRequest, new ActionListener() { @Override public void onResponse(GetResponse getResponse) { @@ -353,11 +356,15 @@ private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm, Actio * @param jobId The jobId to check * @param errorIfMissing If true and the job is missing the listener fails with * a ResourceNotFoundException else false is returned. + * @param parentTaskId The parent task ID if available * @param listener Exists listener */ - public void jobExists(String jobId, boolean errorIfMissing, ActionListener listener) { + public void jobExists(String jobId, boolean errorIfMissing, @Nullable TaskId parentTaskId, ActionListener listener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); + if (parentTaskId != null) { + getRequest.setParentTask(parentTaskId); + } executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { if (getResponse.isExists() == false) { @@ -726,7 +733,7 @@ public void findJobsWithCustomRules(ActionListener> listener) { * @param listener Validation listener */ public void validateDatafeedJob(DatafeedConfig config, ActionListener listener) { - getJob(config.getJobId(), ActionListener.wrap(jobBuilder -> { + getJob(config.getJobId(), null, ActionListener.wrap(jobBuilder -> { try { DatafeedJobValidator.validate(config, jobBuilder.build(), xContentRegistry); listener.onResponse(Boolean.TRUE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 925790d39dac5..40dcfcb902197 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -408,7 +408,7 @@ public void deleteJobDocuments( // Step 5. Get the job as the initial result index name is required ActionListener deleteAnnotationsHandler = ActionListener.wrap( - response -> jobConfigProvider.getJob(jobId, getJobHandler), + response -> jobConfigProvider.getJob(jobId, null, getJobHandler), failureHandler ); @@ -505,9 +505,7 @@ private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAl return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction( - IndicesAliasesRequest.AliasActions.remove() - .aliases(aliases.toArray(new String[aliases.size()])) - .indices(indices.toArray(new String[indices.size()])) + IndicesAliasesRequest.AliasActions.remove().aliases(aliases.toArray(new String[0])).indices(indices.toArray(new String[0])) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index a3eda544a5d64..40ffb7d2ea3e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -80,6 +80,8 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -148,6 +150,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED; @@ -988,6 +991,11 @@ public void bucketRecords( * @param augment Should the category definition be augmented with a Grok pattern? * @param from Skip the first N categories. This parameter is for paging * @param size Take only this number of categories + * @param handler Consumer of the results + * @param errorHandler Consumer of failures + * @param parentTask Cancellable parent task if available + * @param parentTaskId Parent task ID if available + * @param client client with which to make search requests */ public void categoryDefinitions( String jobId, @@ -998,6 +1006,8 @@ public void categoryDefinitions( Integer size, Consumer> handler, Consumer errorHandler, + @Nullable CancellableTask parentTask, + @Nullable TaskId parentTaskId, Client client ) { if (categoryId != null && (from != null || size != null)) { @@ -1015,6 +1025,9 @@ public void categoryDefinitions( SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); + if (parentTaskId != null) { + searchRequest.setParentTask(parentTaskId); + } QueryBuilder categoryIdQuery; SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); if (categoryId != null) { @@ -1059,6 +1072,13 @@ public void categoryDefinitions( ) ) { CategoryDefinition categoryDefinition = CategoryDefinition.LENIENT_PARSER.apply(parser, null); + // Check if parent task is cancelled as augmentation of many categories is a non-trivial task + if (parentTask != null && parentTask.isCancelled()) { + errorHandler.accept( + new TaskCancelledException(format("task cancelled with reason [%s]", parentTask.getReasonCancelled())) + ); + return; + } if (augment) { augmentWithGrokPattern(categoryDefinition); } @@ -1266,7 +1286,7 @@ public void modelSnapshots( Consumer> handler, Consumer errorHandler ) { - modelSnapshots(jobId, from, size, null, true, QueryBuilders.matchAllQuery(), handler, errorHandler); + modelSnapshots(jobId, from, size, null, true, QueryBuilders.matchAllQuery(), null, handler, errorHandler); } /** @@ -1282,6 +1302,9 @@ public void modelSnapshots( * @param sortField optional sort field name (may be null) * @param sortDescending Sort in descending order * @param snapshotId optional snapshot ID to match (null for all) + * @param parentTaskId the parent task ID if available + * @param handler consumer for the found model snapshot objects + * @param errorHandler consumer for any errors that occur */ public void modelSnapshots( String jobId, @@ -1292,6 +1315,7 @@ public void modelSnapshots( String sortField, boolean sortDescending, String snapshotId, + @Nullable TaskId parentTaskId, Consumer> handler, Consumer errorHandler ) { @@ -1300,7 +1324,7 @@ public void modelSnapshots( .timeRange(Result.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs) .build(); - modelSnapshots(jobId, from, size, sortField, sortDescending, qb, handler, errorHandler); + modelSnapshots(jobId, from, size, sortField, sortDescending, qb, parentTaskId, handler, errorHandler); } private void modelSnapshots( @@ -1310,6 +1334,7 @@ private void modelSnapshots( String sortField, boolean sortDescending, QueryBuilder qb, + @Nullable TaskId parentTaskId, Consumer> handler, Consumer errorHandler ) { @@ -1346,6 +1371,9 @@ private void modelSnapshots( .trackTotalHits(true) .fetchSource(REMOVE_QUANTILES_FROM_SOURCE); SearchRequest searchRequest = new SearchRequest(indexName); + if (parentTaskId != null) { + searchRequest.setParentTask(parentTaskId); + } searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())).source(sourceBuilder); executeAsyncWithOrigin( client.threadPool().getThreadContext(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index ba85c816a622c..1e4cedb200763 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -190,6 +190,7 @@ protected void removeDataBefore( ModelSnapshot.TIMESTAMP.getPreferredName(), false, null, + null, snapshotsListener::onResponse, snapshotsListener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedsAction.java index b8aca06b18179..06787984268d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedsAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction.Request; @@ -60,7 +61,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient (r, s) -> r.paramAsBoolean(s, request.allowNoMatch()), request::setAllowNoMatch ); - return channel -> client.execute(GetDatafeedsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + GetDatafeedsAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java index bd1382cae695c..2f7f4e3a47ff4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -58,6 +59,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient restRequest.param(DatafeedConfig.ID.getPreferredName(), null) ).setStart(startTime).setEnd(endTime).build() : new PreviewDatafeedAction.Request(restRequest.param(DatafeedConfig.ID.getPreferredName()), startTime, endTime); - return channel -> client.execute(PreviewDatafeedAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + PreviewDatafeedAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEvaluateDataFrameAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEvaluateDataFrameAction.java index f9dfc9dbc21cf..980733679f740 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEvaluateDataFrameAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestEvaluateDataFrameAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; @@ -33,6 +34,10 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { EvaluateDataFrameAction.Request request = EvaluateDataFrameAction.Request.parseRequest(restRequest.contentOrSourceParamParser()); - return channel -> client.execute(EvaluateDataFrameAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + EvaluateDataFrameAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestExplainDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestExplainDataFrameAnalyticsAction.java index 925210722e850..f0629b52cf701 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestExplainDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestExplainDataFrameAnalyticsAction.java @@ -11,10 +11,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -62,19 +62,20 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient } // We need to consume the body before returning - PutDataFrameAnalyticsAction.Request explainRequestFromBody = Strings.isNullOrEmpty(jobId) - ? PutDataFrameAnalyticsAction.Request.parseRequestForExplain(restRequest.contentOrSourceParamParser()) + ExplainDataFrameAnalyticsAction.Request explainRequestFromBody = Strings.isNullOrEmpty(jobId) + ? ExplainDataFrameAnalyticsAction.Request.parseRequest(restRequest.contentOrSourceParamParser()) : null; return channel -> { RestToXContentListener listener = new RestToXContentListener<>(channel); + RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, restRequest.getHttpChannel()); if (explainRequestFromBody != null) { - client.execute(ExplainDataFrameAnalyticsAction.INSTANCE, explainRequestFromBody, listener); + cancellableClient.execute(ExplainDataFrameAnalyticsAction.INSTANCE, explainRequestFromBody, listener); } else { GetDataFrameAnalyticsAction.Request getRequest = new GetDataFrameAnalyticsAction.Request(jobId); getRequest.setAllowNoResources(false); - client.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + cancellableClient.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { List jobs = getResponse.getResources().results(); if (jobs.size() > 1) { listener.onFailure( @@ -84,8 +85,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ) ); } else { - PutDataFrameAnalyticsAction.Request explainRequest = new PutDataFrameAnalyticsAction.Request(jobs.get(0)); - client.execute(ExplainDataFrameAnalyticsAction.INSTANCE, explainRequest, listener); + ExplainDataFrameAnalyticsAction.Request explainRequest = new ExplainDataFrameAnalyticsAction.Request(jobs.get(0)); + cancellableClient.execute(ExplainDataFrameAnalyticsAction.INSTANCE, explainRequest, listener); } }, listener::onFailure)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java index 1a20f38613115..3070b509d8d79 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction; @@ -65,13 +66,14 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient return channel -> { RestToXContentListener listener = new RestToXContentListener<>(channel); + RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, restRequest.getHttpChannel()); if (requestBuilder.getConfig() != null) { - client.execute(PreviewDataFrameAnalyticsAction.INSTANCE, requestBuilder.build(), listener); + cancellableClient.execute(PreviewDataFrameAnalyticsAction.INSTANCE, requestBuilder.build(), listener); } else { GetDataFrameAnalyticsAction.Request getRequest = new GetDataFrameAnalyticsAction.Request(jobId); getRequest.setAllowNoResources(false); - client.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + cancellableClient.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { List jobs = getResponse.getResources().results(); if (jobs.size() > 1) { listener.onFailure( @@ -81,7 +83,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ) ); } else { - client.execute(PreviewDataFrameAnalyticsAction.INSTANCE, requestBuilder.setConfig(jobs.get(0)).build(), listener); + cancellableClient.execute( + PreviewDataFrameAnalyticsAction.INSTANCE, + requestBuilder.setConfig(jobs.get(0)).build(), + listener + ); } }, listener::onFailure)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobsAction.java index 0d027e1e018ea..9e4659db55d91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobsAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction.Request; @@ -63,7 +64,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient (r, s) -> r.paramAsBoolean(s, request.allowNoMatch()), request::setAllowNoMatch ); - return channel -> client.execute(GetJobsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + GetJobsAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetModelSnapshotsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetModelSnapshotsAction.java index fe7d202f14435..edad75caf1d6b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetModelSnapshotsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetModelSnapshotsAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.action.util.PageParams; @@ -97,6 +98,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient ); } - return channel -> client.execute(GetModelSnapshotsAction.INSTANCE, getModelSnapshots, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + GetModelSnapshotsAction.INSTANCE, + getModelSnapshots, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java index 89c138870fd9d..38ad11703c017 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.action.util.PageParams; @@ -92,7 +93,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient request.setPartitionFieldValue(restRequest.param(Request.PARTITION_FIELD_VALUE.getPreferredName())); } - return channel -> client.execute(GetCategoriesAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + GetCategoriesAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index fc1db21a52325..d9bcf0ee0381a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -412,7 +412,19 @@ public void testCategoryDefinitions() throws IOException { JobResultsProvider provider = createProvider(client); SetOnce> holder = new SetOnce<>(); - provider.categoryDefinitions(jobId, null, null, false, from, size, holder::set, e -> { throw new RuntimeException(e); }, client); + provider.categoryDefinitions( + jobId, + null, + null, + false, + from, + size, + holder::set, + e -> { throw new RuntimeException(e); }, + null, + null, + client + ); QueryPage categoryDefinitions = holder.get(); assertEquals(1L, categoryDefinitions.count()); assertEquals(terms, categoryDefinitions.results().get(0).getTerms()); @@ -441,6 +453,8 @@ public void testCategoryDefinition() throws IOException { null, holder::set, e -> { throw new RuntimeException(e); }, + null, + null, client ); QueryPage categoryDefinitions = holder.get(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 589514555baf9..5be897df538ef 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; @@ -337,13 +336,12 @@ private void givenClientRequests( ) { doAnswer(new Answer() { - AtomicInteger callCount = new AtomicInteger(); + final AtomicInteger callCount = new AtomicInteger(); @Override public Void answer(InvocationOnMock invocationOnMock) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; // Only the last search request should fail if (shouldSearchRequestsSucceed || callCount.get() < (searchResponses.size() + snapshots.size())) { SearchResponse response = searchResponses.get(callCount.getAndIncrement()); @@ -368,13 +366,13 @@ public Void answer(InvocationOnMock invocationOnMock) { for (Map.Entry> snapshot : snapshots.entrySet()) { doAnswer(new Answer() { - AtomicInteger callCount = new AtomicInteger(); + final AtomicInteger callCount = new AtomicInteger(); @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + public Void answer(InvocationOnMock invocationOnMock) { capturedJobIds.add((String) invocationOnMock.getArguments()[0]); - Consumer> listener = (Consumer>) invocationOnMock.getArguments()[8]; - Consumer failure = (Consumer) invocationOnMock.getArguments()[9]; + Consumer> listener = (Consumer>) invocationOnMock.getArguments()[9]; + Consumer failure = (Consumer) invocationOnMock.getArguments()[10]; if (shouldSearchRequestsSucceed || callCount.get() < snapshots.size()) { callCount.incrementAndGet(); listener.accept(new QueryPage<>(snapshot.getValue(), 10, new ParseField("snapshots"))); @@ -384,7 +382,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(resultsProvider) - .modelSnapshots(eq(snapshot.getKey()), anyInt(), anyInt(), any(), any(), any(), anyBoolean(), any(), any(), any()); + .modelSnapshots(eq(snapshot.getKey()), anyInt(), anyInt(), any(), any(), any(), anyBoolean(), any(), any(), any(), any()); } } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml index 03bf508a6e9f6..8f8281a1175a2 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml @@ -239,3 +239,27 @@ ml.delete_data_frame_analytics: id: mixed_cluster_job_to_delete - match: { acknowledged: true } +--- +"Explain data frame analytics": + - do: + ml.explain_data_frame_analytics: + body: > + { + "source": { + "index": "bwc_ml_outlier_detection_job_source" + }, + "dest": { + "index": "mixed_cluster_job_to_delete_results" + }, + "analysis": {"outlier_detection":{}} + } + - is_true: memory_estimation.expected_memory_without_disk + - is_true: memory_estimation.expected_memory_with_disk + - is_true: field_selection + + - do: + ml.explain_data_frame_analytics: + id: "old_cluster_outlier_detection_job" + - is_true: memory_estimation.expected_memory_without_disk + - is_true: memory_estimation.expected_memory_with_disk + - is_true: field_selection