diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index 61d4b52db2d6d..d8033e7c36e6c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -108,6 +108,7 @@ import org.elasticsearch.client.ml.RevertModelSnapshotResponse; import org.elasticsearch.client.ml.SetUpgradeModeRequest; import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest; +import org.elasticsearch.client.ml.StartDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; @@ -2138,12 +2139,12 @@ public Cancellable getDataFrameAnalyticsStatsAsync(GetDataFrameAnalyticsStatsReq * @return action acknowledgement * @throws IOException when there is a serialization issue sending the request or receiving the response */ - public AcknowledgedResponse startDataFrameAnalytics(StartDataFrameAnalyticsRequest request, - RequestOptions options) throws IOException { + public StartDataFrameAnalyticsResponse startDataFrameAnalytics(StartDataFrameAnalyticsRequest request, + RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity(request, MLRequestConverters::startDataFrameAnalytics, options, - AcknowledgedResponse::fromXContent, + StartDataFrameAnalyticsResponse::fromXContent, Collections.emptySet()); } @@ -2160,11 +2161,11 @@ public AcknowledgedResponse startDataFrameAnalytics(StartDataFrameAnalyticsReque * @return cancellable that may be used to cancel the request */ public Cancellable startDataFrameAnalyticsAsync(StartDataFrameAnalyticsRequest request, RequestOptions options, - ActionListener listener) { + ActionListener listener) { return restHighLevelClient.performRequestAsyncAndParseEntity(request, MLRequestConverters::startDataFrameAnalytics, options, - AcknowledgedResponse::fromXContent, + StartDataFrameAnalyticsResponse::fromXContent, listener, Collections.emptySet()); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/OpenJobResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/OpenJobResponse.java index 61ce6b1544ea1..ce35067679e91 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/OpenJobResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/OpenJobResponse.java @@ -33,18 +33,23 @@ public class OpenJobResponse implements ToXContentObject { private static final ParseField OPENED = new ParseField("opened"); + private static final ParseField NODE = new ParseField("node"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("open_job_response", true, (a) -> new OpenJobResponse((Boolean)a[0])); + new ConstructingObjectParser<>("open_job_response", true, + (a) -> new OpenJobResponse((Boolean) a[0], (String) a[1])); static { PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), OPENED); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE); } - private boolean opened; + private final boolean opened; + private final String node; - OpenJobResponse(boolean opened) { + OpenJobResponse(boolean opened, String node) { this.opened = opened; + this.node = node; } public static OpenJobResponse fromXContent(XContentParser parser) throws IOException { @@ -60,6 +65,18 @@ public boolean isOpened() { return opened; } + /** + * The node that the job was assigned to + * + * @return The ID of a node if the job was assigned to a node. If an empty string is returned + * it means the job was allowed to open lazily and has not yet been assigned to a node. + * If null is returned it means the server version is too old to return node + * information. + */ + public String getNode() { + return node; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -71,18 +88,22 @@ public boolean equals(Object other) { } OpenJobResponse that = (OpenJobResponse) other; - return isOpened() == that.isOpened(); + return opened == that.opened + && Objects.equals(node, that.node); } @Override public int hashCode() { - return Objects.hash(isOpened()); + return Objects.hash(opened, node); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(OPENED.getPreferredName(), opened); + if (node != null) { + builder.field(NODE.getPreferredName(), node); + } builder.endObject(); return builder; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDataFrameAnalyticsResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDataFrameAnalyticsResponse.java new file mode 100644 index 0000000000000..a36eb53186e19 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDataFrameAnalyticsResponse.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response indicating if the Machine Learning Datafeed is now started or not + */ +public class StartDataFrameAnalyticsResponse extends AcknowledgedResponse { + + private static final ParseField NODE = new ParseField("node"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "start_data_frame_analytics_response", + true, + (a) -> new StartDataFrameAnalyticsResponse((Boolean) a[0], (String) a[1])); + + static { + declareAcknowledgedField(PARSER); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE); + } + + private final String node; + + public StartDataFrameAnalyticsResponse(boolean acknowledged, String node) { + super(acknowledged); + this.node = node; + } + + public static StartDataFrameAnalyticsResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * The node that the job was assigned to + * + * @return The ID of a node if the job was assigned to a node. If an empty string is returned + * it means the job was allowed to open lazily and has not yet been assigned to a node. + * If null is returned it means the server version is too old to return node + * information. + */ + public String getNode() { + return node; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + StartDataFrameAnalyticsResponse that = (StartDataFrameAnalyticsResponse) other; + return isAcknowledged() == that.isAcknowledged() + && Objects.equals(node, that.node); + } + + @Override + public int hashCode() { + return Objects.hash(isAcknowledged(), node); + } + + @Override + public void addCustomFields(XContentBuilder builder, Params params) throws IOException { + if (node != null) { + builder.field(NODE.getPreferredName(), node); + } + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java index aec836361154a..b1df564e9bd25 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java @@ -33,21 +33,25 @@ public class StartDatafeedResponse implements ToXContentObject { private static final ParseField STARTED = new ParseField("started"); + private static final ParseField NODE = new ParseField("node"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "start_datafeed_response", true, - (a) -> new StartDatafeedResponse((Boolean)a[0])); + (a) -> new StartDatafeedResponse((Boolean) a[0], (String) a[1])); static { PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE); } private final boolean started; + private final String node; - public StartDatafeedResponse(boolean started) { + public StartDatafeedResponse(boolean started, String node) { this.started = started; + this.node = node; } public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException { @@ -63,6 +67,18 @@ public boolean isStarted() { return started; } + /** + * The node that the datafeed was assigned to + * + * @return The ID of a node if the datafeed was assigned to a node. If an empty string is returned + * it means the datafeed was allowed to open lazily and has not yet been assigned to a node. + * If null is returned it means the server version is too old to return node + * information. + */ + public String getNode() { + return node; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -74,18 +90,22 @@ public boolean equals(Object other) { } StartDatafeedResponse that = (StartDatafeedResponse) other; - return isStarted() == that.isStarted(); + return started == started + && Objects.equals(node, that.node); } @Override public int hashCode() { - return Objects.hash(isStarted()); + return Objects.hash(started, node); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(STARTED.getPreferredName(), started); + if (node != null) { + builder.field(NODE.getPreferredName(), node); + } builder.endObject(); return builder; } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index befd329161cea..c057b0864e143 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -122,6 +122,7 @@ import org.elasticsearch.client.ml.RevertModelSnapshotResponse; import org.elasticsearch.client.ml.SetUpgradeModeRequest; import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest; +import org.elasticsearch.client.ml.StartDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; @@ -232,6 +233,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { @@ -461,7 +463,10 @@ public void testOpenJob() throws Exception { // tag::open-job-response boolean isOpened = openJobResponse.isOpened(); // <1> + String node = openJobResponse.getNode(); // <2> // end::open-job-response + + assertThat(node, notNullValue()); } { // tag::open-job-execute-listener @@ -1011,11 +1016,14 @@ public void testStartDatafeed() throws Exception { // tag::start-datafeed-execute StartDatafeedResponse response = client.machineLearning().startDatafeed(request, RequestOptions.DEFAULT); // end::start-datafeed-execute + // tag::start-datafeed-response boolean started = response.isStarted(); // <1> + String node = response.getNode(); // <2> // end::start-datafeed-response assertTrue(started); + assertThat(node, notNullValue()); } { StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); @@ -3131,14 +3139,16 @@ public void testStartDataFrameAnalytics() throws Exception { // end::start-data-frame-analytics-request // tag::start-data-frame-analytics-execute - AcknowledgedResponse response = client.machineLearning().startDataFrameAnalytics(request, RequestOptions.DEFAULT); + StartDataFrameAnalyticsResponse response = client.machineLearning().startDataFrameAnalytics(request, RequestOptions.DEFAULT); // end::start-data-frame-analytics-execute // tag::start-data-frame-analytics-response boolean acknowledged = response.isAcknowledged(); + String node = response.getNode(); // <1> // end::start-data-frame-analytics-response assertThat(acknowledged, is(true)); + assertThat(node, notNullValue()); } assertBusy( () -> assertThat(getAnalyticsState(DF_ANALYTICS_CONFIG.getId()), equalTo(DataFrameAnalyticsState.STOPPED)), @@ -3147,9 +3157,9 @@ public void testStartDataFrameAnalytics() throws Exception { StartDataFrameAnalyticsRequest request = new StartDataFrameAnalyticsRequest("my-analytics-config"); // tag::start-data-frame-analytics-execute-listener - ActionListener listener = new ActionListener<>() { + ActionListener listener = new ActionListener<>() { @Override - public void onResponse(AcknowledgedResponse response) { + public void onResponse(StartDataFrameAnalyticsResponse response) { // <1> } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/OpenJobResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/OpenJobResponseTests.java index 7f177c6e1ef82..381480865fb1e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/OpenJobResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/OpenJobResponseTests.java @@ -27,7 +27,8 @@ public class OpenJobResponseTests extends AbstractXContentTestCase { + + @Override + protected StartDataFrameAnalyticsResponse createTestInstance() { + String node = randomFrom("", randomAlphaOfLength(10), null); + return new StartDataFrameAnalyticsResponse(randomBoolean(), node); + } + + @Override + protected StartDataFrameAnalyticsResponse doParseInstance(XContentParser parser) throws IOException { + return StartDataFrameAnalyticsResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java index 57bc75121d43b..7688cd98c111e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java @@ -27,7 +27,8 @@ public class StartDatafeedResponseTests extends AbstractXContentTestCase `isOpened()` from the +{response}+ indicates if the job was successfully -opened or not. +<1> `isOpened()` from the +{response}+ is always `true` if the job was +opened successfully. (An exception would be thrown instead if the job +was not opened successfully.) +<2> `getNode()` returns the node that the job was assigned to. If the +job is allowed to open lazily and has not yet been assigned to a node +then an empty string is returned. If `getNode()` returns `null` then +the server is an old version that does not return node information. -include::../execution.asciidoc[] \ No newline at end of file +include::../execution.asciidoc[] diff --git a/docs/java-rest/high-level/ml/start-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/start-data-frame-analytics.asciidoc index 27f9b22b7e530..4b2fa16920989 100644 --- a/docs/java-rest/high-level/ml/start-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/start-data-frame-analytics.asciidoc @@ -24,6 +24,15 @@ include-tagged::{doc-tests-file}[{api}-request] include::../execution.asciidoc[] [id="{upid}-{api}-response"] -==== Response +==== Start {dfanalytics-job} response -The returned +{response}+ object acknowledges the {dfanalytics-job} has started. \ No newline at end of file +The returned +{response}+ object acknowledges the {dfanalytics-job} has started. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- +<1> `getNode()` returns the node that the job was assigned to. If the +job is allowed to open lazily and has not yet been assigned to a node +then an empty string is returned. If `getNode()` returns `null` then +the server is an old version that does not return node information. diff --git a/docs/java-rest/high-level/ml/start-datafeed.asciidoc b/docs/java-rest/high-level/ml/start-datafeed.asciidoc index 84eff67380d5e..0433542662a2e 100644 --- a/docs/java-rest/high-level/ml/start-datafeed.asciidoc +++ b/docs/java-rest/high-level/ml/start-datafeed.asciidoc @@ -5,13 +5,13 @@ -- [role="xpack"] [id="{upid}-{api}"] -=== Start datafeed API +=== Start {dfeed} API -Starts a {ml} datafeed in the cluster. It accepts a +{request}+ object and +Starts a {ml} {dfeed} in the cluster. It accepts a +{request}+ object and responds with a +{response}+ object. [id="{upid}-{api}-request"] -==== Start datafeed request +==== Start {dfeed} request A +{request}+ object is created referencing a non-null `datafeedId`. All other fields are optional for the request. @@ -30,14 +30,29 @@ The following arguments are optional. -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-request-options] -------------------------------------------------- -<1> Set when the datafeed should end, the value is exclusive. +<1> Set when the {dfeed} should end, the value is exclusive. May be an epoch seconds, epoch millis or an ISO 8601 string. "now" is a special value that indicates the current time. -If you do not specify an end time, the datafeed runs continuously. -<2> Set when the datafeed should start, the value is inclusive. +If you do not specify an end time, the {dfeed} runs continuously. +<2> Set when the {dfeed} should start, the value is inclusive. May be an epoch seconds, epoch millis or an ISO 8601 string. -If you do not specify a start time and the datafeed is associated with a new job, +If you do not specify a start time and the {dfeed} is associated with a new job, the analysis starts from the earliest time for which data is available. <3> Set the timeout for the request +[id="{upid}-{api}-response"] +==== Start {dfeed} response + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- +<1> `isStarted()` from the +{response}+ is always `true` if the {dfeed} was +started successfully. (An exception would be thrown instead if the {dfeed} +was not started successfully.) +<2> `getNode()` returns the node that the {dfeed} was assigned to. If the +{dfeed} is allowed to open lazily and has not yet been assigned to a node +then an empty string is returned. If `getNode()` returns `null` then +the server is an old version that does not return node information. + include::../execution.asciidoc[] diff --git a/docs/reference/ml/anomaly-detection/apis/open-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/open-job.asciidoc index 39f3488201925..54f0e39efa9a5 100644 --- a/docs/reference/ml/anomaly-detection/apis/open-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/open-job.asciidoc @@ -47,6 +47,17 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection] (Optional, time) Controls the time to wait until a job has opened. The default value is 30 minutes. +[[ml-open-job-response-body]] +==== {api-response-body-title} + +`node`:: + (string) The ID of the node that the job was opened on. If the job is allowed to +open lazily and has not yet been assigned to a node, this value is an empty string. + +`opened`:: + (boolean) For a successful response, this value is always `true`. On failure, an + exception is returned instead. + [[ml-open-job-example]] ==== {api-examples-title} @@ -64,6 +75,7 @@ When the job opens, you receive the following results: [source,console-result] ---- { - "opened": true + "opened" : true, + "node" : "node-1" } ---- diff --git a/docs/reference/ml/anomaly-detection/apis/start-datafeed.asciidoc b/docs/reference/ml/anomaly-detection/apis/start-datafeed.asciidoc index 452944ef1e828..13c22e3c97b63 100644 --- a/docs/reference/ml/anomaly-detection/apis/start-datafeed.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/start-datafeed.asciidoc @@ -92,6 +92,18 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=datafeed-id] (Optional, time) Controls the amount of time to wait until a {dfeed} starts. The default value is 20 seconds. +[[ml-start-datafeed-response-body]] +==== {api-response-body-title} + +`node`:: + (string) The ID of the node that the {dfeed} was started on. +If the {dfeed} is allowed to open lazily and has not yet been + assigned to a node, this value is an empty string. + +`started`:: + (boolean) For a successful response, this value is always `true`. On failure, an + exception is returned instead. + [[ml-start-datafeed-example]] ==== {api-examples-title} @@ -109,6 +121,7 @@ When the {dfeed} starts, you receive the following results: [source,console-result] ---- { - "started": true + "started" : true, + "node" : "node-1" } ---- diff --git a/docs/reference/ml/df-analytics/apis/start-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/start-dfanalytics.asciidoc index 02b62465040c7..07f1e6cf9dcbe 100644 --- a/docs/reference/ml/df-analytics/apis/start-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/start-dfanalytics.asciidoc @@ -64,6 +64,17 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics-define] (Optional, <>) include::{docdir}/ml/ml-shared.asciidoc[tag=timeout-start] +[[ml-start-dfanalytics-response-body]] +==== {api-response-body-title} + +`acknowledged`:: + (boolean) For a successful response, this value is always `true`. On failure, an + exception is returned instead. + +`node`:: + (string) The ID of the node that the job was started on. + If the job is allowed to open lazily and has not yet been assigned to a node, this value is an empty string. + [[ml-start-dfanalytics-example]] ==== {api-examples-title} @@ -80,6 +91,7 @@ When the {dfanalytics-job} starts, you receive the following results: [source,console-result] ---- { - "acknowledged" : true + "acknowledged" : true, + "node" : "node-1" } ---- diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponse.java new file mode 100644 index 0000000000000..ab61a4ef9fdba --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponse.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class NodeAcknowledgedResponse extends AcknowledgedResponse { + + public static final String NODE_FIELD = "node"; + + private final String node; + + public NodeAcknowledgedResponse(boolean acknowledged, String node) { + super(acknowledged); + this.node = Objects.requireNonNull(node); + } + + public NodeAcknowledgedResponse(StreamInput in) throws IOException { + super(in); + // TODO change in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + node = in.readString(); + } else { + node = ""; + } + } + + public String getNode() { + return node; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + // TODO change in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(node); + } + } + + @Override + protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { + builder.field(NODE_FIELD, node); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NodeAcknowledgedResponse that = (NodeAcknowledgedResponse) o; + return isAcknowledged() == that.isAcknowledged() + && Objects.equals(node, that.node); + } + + @Override + public int hashCode() { + return Objects.hash(isAcknowledged(), node); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java index d0b80f5bb5561..276b17d7556bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.Metadata; @@ -34,13 +33,13 @@ import java.io.IOException; import java.util.Objects; -public class OpenJobAction extends ActionType { +public class OpenJobAction extends ActionType { public static final OpenJobAction INSTANCE = new OpenJobAction(); public static final String NAME = "cluster:admin/xpack/ml/job/open"; private OpenJobAction() { - super(NAME, AcknowledgedResponse::new); + super(NAME, NodeAcknowledgedResponse::new); } public static class Request extends MasterNodeRequest implements ToXContentObject { @@ -257,7 +256,7 @@ static boolean match(Task task, String expectedJobId) { } } - static class RequestBuilder extends ActionRequestBuilder { + static class RequestBuilder extends ActionRequestBuilder { RequestBuilder(ElasticsearchClient client, OpenJobAction action) { super(client, action, new Request()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java index f1f71c016b7ac..94d77b6b49344 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.Metadata; @@ -37,13 +36,13 @@ import java.util.List; import java.util.Objects; -public class StartDataFrameAnalyticsAction extends ActionType { +public class StartDataFrameAnalyticsAction extends ActionType { public static final StartDataFrameAnalyticsAction INSTANCE = new StartDataFrameAnalyticsAction(); public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/start"; private StartDataFrameAnalyticsAction() { - super(NAME, AcknowledgedResponse::new); + super(NAME, NodeAcknowledgedResponse::new); } public static class Request extends MasterNodeRequest implements ToXContentObject { @@ -143,7 +142,7 @@ public String toString() { } } - static class RequestBuilder extends ActionRequestBuilder { + static class RequestBuilder extends ActionRequestBuilder { RequestBuilder(ElasticsearchClient client, StartDataFrameAnalyticsAction action) { super(client, action, new Request()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 63ab059003c8a..21fa223be63d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.ParseField; @@ -40,7 +39,7 @@ import java.util.Objects; import java.util.function.LongSupplier; -public class StartDatafeedAction extends ActionType { +public class StartDatafeedAction extends ActionType { public static final ParseField START_TIME = new ParseField("start"); public static final ParseField END_TIME = new ParseField("end"); @@ -50,7 +49,7 @@ public class StartDatafeedAction extends ActionType { public static final String NAME = "cluster:admin/xpack/ml/datafeed/start"; private StartDatafeedAction() { - super(NAME, AcknowledgedResponse::new); + super(NAME, NodeAcknowledgedResponse::new); } public static class Request extends MasterNodeRequest implements ToXContentObject { @@ -339,7 +338,7 @@ public boolean equals(Object obj) { } } - static class RequestBuilder extends ActionRequestBuilder { + static class RequestBuilder extends ActionRequestBuilder { RequestBuilder(ElasticsearchClient client, StartDatafeedAction action) { super(client, action, new Request()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponseTests.java new file mode 100644 index 0000000000000..0c3b83b8d85d1 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/NodeAcknowledgedResponseTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; + +public class NodeAcknowledgedResponseTests extends AbstractBWCWireSerializationTestCase { + + @Override + protected NodeAcknowledgedResponse createTestInstance() { + return new NodeAcknowledgedResponse(true, randomFrom(randomAlphaOfLength(10), "")); + } + + @Override + protected Writeable.Reader instanceReader() { + return NodeAcknowledgedResponse::new; + } + + @Override + protected NodeAcknowledgedResponse mutateInstance(NodeAcknowledgedResponse instance) { + if (instance.getNode().isEmpty()) { + return new NodeAcknowledgedResponse(true, randomAlphaOfLength(10)); + } else { + return new NodeAcknowledgedResponse(true, ""); + } + } + + @Override + protected NodeAcknowledgedResponse mutateInstanceForVersion(NodeAcknowledgedResponse instance, Version version) { + // TODO change in backport + if (version.onOrAfter(Version.V_8_0_0)) { + return instance; + } else { + return new NodeAcknowledgedResponse(true, ""); + } + } +} diff --git a/x-pack/plugin/ml/qa/basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/x-pack/plugin/ml/qa/basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index c2070fb2effed..9fb9df019a284 100644 --- a/x-pack/plugin/ml/qa/basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/x-pack/plugin/ml/qa/basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -24,6 +24,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; public class MlBasicMultiNodeIT extends ESRestTestCase { @@ -55,7 +56,7 @@ public void testMiniFarequote() throws Exception { Response openResponse = client().performRequest( new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); - assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); + assertThat(entityAsMap(openResponse), hasEntry("opened", true)); Request addData = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); addData.setEntity(new NStringEntity( @@ -136,12 +137,12 @@ public void testMiniFarequoteWithDatafeeder() throws Exception { Response openResponse = client().performRequest( new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); - assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); + assertThat(entityAsMap(openResponse), hasEntry("opened", true)); Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start"); startRequest.addParameter("start", "0"); Response startResponse = client().performRequest(startRequest); - assertEquals(Collections.singletonMap("started", true), entityAsMap(startResponse)); + assertThat(entityAsMap(startResponse), hasEntry("started", true)); assertBusy(() -> { try { @@ -175,7 +176,7 @@ public void testMiniFarequoteReopen() throws Exception { Response openResponse = client().performRequest( new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); - assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); + assertThat(entityAsMap(openResponse), hasEntry("opened", true)); Request addDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); addDataRequest.setEntity(new NStringEntity( @@ -214,7 +215,7 @@ public void testMiniFarequoteReopen() throws Exception { Request openRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); openRequest.addParameter("timeout", "20s"); Response openResponse2 = client().performRequest(openRequest); - assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse2)); + assertThat(entityAsMap(openResponse2), hasEntry("opened", true)); // feed some more data points Request addDataRequest2 = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 3ff2090b381cf..67b4c1686e113 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; @@ -45,6 +46,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -308,7 +310,8 @@ public void testStopAndRestart() throws Exception { assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); - startAnalytics(jobId); + NodeAcknowledgedResponse response = startAnalytics(jobId); + assertThat(response.getNode(), not(emptyString())); // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. assertBusy(() -> { @@ -325,7 +328,8 @@ public void testStopAndRestart() throws Exception { // Now let's start it again try { - startAnalytics(jobId); + response = startAnalytics(jobId); + assertThat(response.getNode(), not(emptyString())); } catch (Exception e) { if (e.getMessage().equals("Cannot start because the job has already finished")) { // That means the job had managed to complete diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 01e6a77bd73e3..373654a9c4838 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -1003,7 +1003,7 @@ public void testRealtime() throws Exception { Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start"); startRequest.addParameter("start", "2016-06-01T00:00:00Z"); Response response = client().performRequest(startRequest); - assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}")); + assertThat(EntityUtils.toString(response.getEntity()), containsString("\"started\":true")); assertBusy(() -> { try { Response getJobResponse = client().performRequest(new Request("GET", @@ -1062,7 +1062,7 @@ public void testForceDeleteWhileDatafeedIsRunning() throws Exception { startRequest.addParameter("start", "2016-06-01T00:00:00Z"); Response response = client().performRequest(startRequest); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}")); + assertThat(EntityUtils.toString(response.getEntity()), containsString("\"started\":true")); ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId))); @@ -1154,7 +1154,7 @@ private void startDatafeedAndWaitUntilStopped(String datafeedId, String authHead options.addHeader("Authorization", authHeader); request.setOptions(options); Response startDatafeedResponse = client().performRequest(request); - assertThat(EntityUtils.toString(startDatafeedResponse.getEntity()), equalTo("{\"started\":true}")); + assertThat(EntityUtils.toString(startDatafeedResponse.getEntity()), containsString("\"started\":true")); assertBusy(() -> { try { Response datafeedStatsResponse = client().performRequest(new Request("GET", diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 62b05bbcd81f4..82e6083f59868 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -25,7 +25,6 @@ import org.junit.After; import java.io.IOException; -import java.util.Collections; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -125,7 +124,7 @@ public void testUsage() throws IOException { assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open")); - assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); + assertThat(entityAsMap(openResponse), hasEntry("opened", true)); usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 2b57961505144..fd034c59e3945 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; @@ -125,7 +126,7 @@ protected AcknowledgedResponse deleteAnalytics(String id) { return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet(); } - protected AcknowledgedResponse startAnalytics(String id) { + protected NodeAcknowledgedResponse startAnalytics(String id) { StartDataFrameAnalyticsAction.Request request = new StartDataFrameAnalyticsAction.Request(id); return client().execute(StartDataFrameAnalyticsAction.INSTANCE, request).actionGet(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index f9e9ab9b9e5f2..ea23dd89d26b4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; @@ -28,11 +29,13 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { @@ -245,7 +248,8 @@ public void testStopAndRestart() throws Exception { assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); - startAnalytics(jobId); + NodeAcknowledgedResponse response = startAnalytics(jobId); + assertThat(response.getNode(), not(emptyString())); // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. assertBusy(() -> { @@ -262,7 +266,8 @@ public void testStopAndRestart() throws Exception { // Now let's start it again try { - startAnalytics(jobId); + response = startAnalytics(jobId); + assertThat(response.getNode(), not(emptyString())); } catch (Exception e) { if (e.getMessage().equals("Cannot start because the job has already finished")) { // That means the job had managed to complete diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 54b159852466c..3c11425802296 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; @@ -36,12 +37,14 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -571,7 +574,8 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws assertIsStopped(id); // Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough - startAnalytics(id); + NodeAcknowledgedResponse response = startAnalytics(id); + assertThat(response.getNode(), emptyString()); // Wait until state is STARTING, there is no node but there is an assignment explanation. assertBusy(() -> { @@ -620,7 +624,8 @@ public void testOutlierDetectionStopAndRestart() throws Exception { putAnalytics(config); assertIsStopped(id); - startAnalytics(id); + NodeAcknowledgedResponse response = startAnalytics(id); + assertThat(response.getNode(), not(emptyString())); // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. assertBusy(() -> { @@ -633,7 +638,8 @@ public void testOutlierDetectionStopAndRestart() throws Exception { // Now let's start it again try { - startAnalytics(id); + response = startAnalytics(id); + assertThat(response.getNode(), not(emptyString())); } catch (Exception e) { if (e.getMessage().equals("Cannot start because the job has already finished")) { // That means the job had managed to complete diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index b67c7f5fd4d5c..7139b75624792 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -47,6 +46,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -82,7 +82,7 @@ To ensure that a subsequent close job call will see that same task status (and s In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. The open job api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. */ -public class TransportOpenJobAction extends TransportMasterNodeAction { +public class TransportOpenJobAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportOpenJobAction.class); @@ -194,8 +194,8 @@ protected String executor() { } @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); + protected NodeAcknowledgedResponse read(StreamInput in) throws IOException { + return new NodeAcknowledgedResponse(in); } @Override @@ -208,7 +208,7 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste @Override protected void masterOperation(Task task, OpenJobAction.Request request, ClusterState state, - ActionListener listener) { + ActionListener listener) { if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId())); return; @@ -218,10 +218,10 @@ protected void masterOperation(Task task, OpenJobAction.Request request, Cluster if (licenseState.isMachineLearningAllowed()) { // Clear job finished time once the job is started and respond - ActionListener clearJobFinishTime = ActionListener.wrap( + ActionListener clearJobFinishTime = ActionListener.wrap( response -> { if (response.isAcknowledged()) { - clearJobFinishedTime(jobParams.getJobId(), listener); + clearJobFinishedTime(response, jobParams.getJobId(), listener); } else { listener.onResponse(response); } @@ -274,7 +274,7 @@ public void onFailure(Exception e) { } } - private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener listener) { + private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(), new PersistentTasksService.WaitForPersistentTaskListener() { @@ -289,7 +289,7 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask listener) { + private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener listener) { JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( - job -> listener.onResponse(new AcknowledgedResponse(true)), + job -> listener.onResponse(response), e -> { logger.error("[" + jobId + "] Failed to clear finished_time", e); // Not a critical error so continue - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(response); } )); } private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask persistentTask, Exception exception, - ActionListener listener) { + ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener>() { @Override @@ -548,6 +548,7 @@ void closeJob(String reason) { private static class JobPredicate implements Predicate> { private volatile Exception exception; + private volatile String node = ""; private volatile boolean shouldCancel; @Override @@ -593,6 +594,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask persistentTa case CLOSED: return false; case OPENED: + node = persistentTask.getExecutorNode(); return true; case CLOSING: exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be " diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index c210b3cfb02a6..03c7ab0ce2862 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; @@ -56,6 +55,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -94,7 +94,7 @@ * Starts the persistent task for running data frame analytics. */ public class TransportStartDataFrameAnalyticsAction - extends TransportMasterNodeAction { + extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class); private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active"; @@ -140,8 +140,8 @@ protected String executor() { } @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); + protected NodeAcknowledgedResponse read(StreamInput in) throws IOException { + return new NodeAcknowledgedResponse(in); } @Override @@ -154,7 +154,7 @@ protected ClusterBlockException checkBlock(StartDataFrameAnalyticsAction.Request @Override protected void masterOperation(Task task, StartDataFrameAnalyticsAction.Request request, ClusterState state, - ActionListener listener) { + ActionListener listener) { if (licenseState.isMachineLearningAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); return; @@ -388,7 +388,7 @@ private void checkDestIndexIsEmptyIfExists(ParentTaskAssigningClient parentTaskC } private void waitForAnalyticsStarted(PersistentTasksCustomMetadata.PersistentTask task, - TimeValue timeout, ActionListener listener) { + TimeValue timeout, ActionListener listener) { AnalyticsPredicate predicate = new AnalyticsPredicate(); persistentTasksService.waitForPersistentTaskCondition(task.getId(), predicate, timeout, @@ -402,7 +402,7 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask progre private static class AnalyticsPredicate implements Predicate> { private volatile Exception exception; + private volatile String node = ""; private volatile String assignmentExplanation; @Override @@ -491,6 +492,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask persistentTa case STARTED: case REINDEXING: case ANALYZING: + node = persistentTask.getExecutorNode(); return true; case STOPPING: exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started"); @@ -513,7 +515,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask persistentTa private void cancelAnalyticsStart( PersistentTasksCustomMetadata.PersistentTask persistentTask, Exception exception, - ActionListener listener) { + ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener>() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 1747f2258a0d0..274fde654945d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -43,6 +42,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; @@ -79,7 +79,7 @@ To ensure that a subsequent stop datafeed call will see that same task status (a In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. */ -public class TransportStartDatafeedAction extends TransportMasterNodeAction { +public class TransportStartDatafeedAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportStartDatafeedAction.class); @@ -148,13 +148,13 @@ protected String executor() { } @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); + protected NodeAcknowledgedResponse read(StreamInput in) throws IOException { + return new NodeAcknowledgedResponse(in); } @Override protected void masterOperation(Task task, StartDatafeedAction.Request request, ClusterState state, - ActionListener listener) { + ActionListener listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); if (licenseState.isMachineLearningAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); @@ -283,7 +283,7 @@ protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, } private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, - ActionListener listener) { + ActionListener listener) { DatafeedPredicate predicate = new DatafeedPredicate(); persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(), new PersistentTasksService.WaitForPersistentTaskListener() { @@ -295,7 +295,7 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask persistentTask, - Exception exception, ActionListener listener) { + Exception exception, ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener>() { @Override @@ -494,6 +494,7 @@ public void isolate() { private static class DatafeedPredicate implements Predicate> { private volatile Exception exception; + private volatile String node = ""; @Override public boolean test(PersistentTasksCustomMetadata.PersistentTask persistentTask) { @@ -514,7 +515,11 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask persistentTa } } DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); - return datafeedState == DatafeedState.STARTED; + if (datafeedState == DatafeedState.STARTED) { + node = persistentTask.getExecutorNode(); + return true; + } + return false; } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java index 23649cdf05575..d3bc49d05d6ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -16,6 +15,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.MachineLearning; @@ -71,12 +71,14 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient } return channel -> { client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, - new RestBuilderListener(channel) { + new RestBuilderListener(channel) { @Override - public RestResponse buildResponse(AcknowledgedResponse r, XContentBuilder builder) throws Exception { + public RestResponse buildResponse(NodeAcknowledgedResponse r, XContentBuilder builder) throws Exception { + // This doesn't use the toXContent of the response object because we rename "acknowledged" to "started" builder.startObject(); builder.field("started", r.isAcknowledged()); + builder.field(NodeAcknowledgedResponse.NODE_FIELD, r.getNode()); builder.endObject(); return new BytesRestResponse(RestStatus.OK, builder); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java index 631c1aa526ad5..082e6d14e9d5e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.rest.job; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -15,6 +14,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.MachineLearning; @@ -61,11 +61,13 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient request = new OpenJobAction.Request(jobParams); } return channel -> { - client.execute(OpenJobAction.INSTANCE, request, new RestBuilderListener(channel) { + client.execute(OpenJobAction.INSTANCE, request, new RestBuilderListener(channel) { @Override - public RestResponse buildResponse(AcknowledgedResponse r, XContentBuilder builder) throws Exception { + public RestResponse buildResponse(NodeAcknowledgedResponse r, XContentBuilder builder) throws Exception { + // This doesn't use the toXContent of the response object because we rename "acknowledged" to "opened" builder.startObject(); builder.field("opened", r.isAcknowledged()); + builder.field(NodeAcknowledgedResponse.NODE_FIELD, r.getNode()); builder.endObject(); return new BytesRestResponse(RestStatus.OK, builder); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/license/MachineLearningLicensingIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/license/MachineLearningLicensingIT.java index 04d79f1fbcbb3..e857f259bb53f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/license/MachineLearningLicensingIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/license/MachineLearningLicensingIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.InternalInferModelAction; +import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -113,7 +114,7 @@ public void testMachineLearningOpenJobActionRestricted() throws Exception { assertMLAllowed(false); // test that license restricted apis do not work ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class, () -> { - PlainActionFuture listener = PlainActionFuture.newFuture(); + PlainActionFuture listener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), listener); listener.actionGet(); }); @@ -133,9 +134,9 @@ public void testMachineLearningOpenJobActionRestricted() throws Exception { }); // test that license restricted apis do now work - PlainActionFuture listener = PlainActionFuture.newFuture(); + PlainActionFuture listener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), listener); - AcknowledgedResponse response2 = listener.actionGet(); + NodeAcknowledgedResponse response2 = listener.actionGet(); assertNotNull(response2); } @@ -196,12 +197,12 @@ public void testAutoCloseJobWithDatafeed() throws Exception { PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); // open job - PlainActionFuture openJobListener = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener); - AcknowledgedResponse openJobResponse = openJobListener.actionGet(); + NodeAcknowledgedResponse openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); // start datafeed - PlainActionFuture listener = PlainActionFuture.newFuture(); + PlainActionFuture listener = PlainActionFuture.newFuture(); client().execute(StartDatafeedAction.INSTANCE, new StartDatafeedAction.Request(datafeedId, 0L), listener); listener.actionGet(); @@ -232,12 +233,12 @@ public void testAutoCloseJobWithDatafeed() throws Exception { assertMLAllowed(true); // open job - PlainActionFuture openJobListener2 = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener2 = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener2); - AcknowledgedResponse openJobResponse3 = openJobListener2.actionGet(); + NodeAcknowledgedResponse openJobResponse3 = openJobListener2.actionGet(); assertNotNull(openJobResponse3); // start datafeed - PlainActionFuture listener2 = PlainActionFuture.newFuture(); + PlainActionFuture listener2 = PlainActionFuture.newFuture(); client().execute(StartDatafeedAction.INSTANCE, new StartDatafeedAction.Request(datafeedId, 0L), listener2); listener2.actionGet(); @@ -291,9 +292,9 @@ public void testMachineLearningStartDatafeedActionRestricted() throws Exception Collections.singletonList(datafeedIndex))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); - PlainActionFuture openJobListener = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener); - AcknowledgedResponse openJobResponse = openJobListener.actionGet(); + NodeAcknowledgedResponse openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); // Pick a license that does not allow machine learning @@ -312,7 +313,7 @@ public void testMachineLearningStartDatafeedActionRestricted() throws Exception // test that license restricted apis do not work ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class, () -> { - PlainActionFuture listener = PlainActionFuture.newFuture(); + PlainActionFuture listener = PlainActionFuture.newFuture(); client().execute(StartDatafeedAction.INSTANCE, new StartDatafeedAction.Request(datafeedId, 0L), listener); listener.actionGet(); }); @@ -326,14 +327,14 @@ public void testMachineLearningStartDatafeedActionRestricted() throws Exception assertMLAllowed(true); // test that license restricted apis do now work // re-open job now that the license is valid again - PlainActionFuture openJobListener2 = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener2 = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener2); - AcknowledgedResponse openJobResponse3 = openJobListener2.actionGet(); + NodeAcknowledgedResponse openJobResponse3 = openJobListener2.actionGet(); assertNotNull(openJobResponse3); - PlainActionFuture listener = PlainActionFuture.newFuture(); + PlainActionFuture listener = PlainActionFuture.newFuture(); client().execute(StartDatafeedAction.INSTANCE, new StartDatafeedAction.Request(datafeedId, 0L), listener); - AcknowledgedResponse response = listener.actionGet(); + NodeAcknowledgedResponse response = listener.actionGet(); assertNotNull(response); } @@ -354,14 +355,14 @@ public void testMachineLearningStopDatafeedActionNotRestricted() throws Exceptio Collections.singletonList(datafeedIndex))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); - PlainActionFuture openJobListener = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener); - AcknowledgedResponse openJobResponse = openJobListener.actionGet(); + NodeAcknowledgedResponse openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); - PlainActionFuture startDatafeedListener = PlainActionFuture.newFuture(); + PlainActionFuture startDatafeedListener = PlainActionFuture.newFuture(); client().execute(StartDatafeedAction.INSTANCE, new StartDatafeedAction.Request(datafeedId, 0L), startDatafeedListener); - AcknowledgedResponse startDatafeedResponse = startDatafeedListener.actionGet(); + NodeAcknowledgedResponse startDatafeedResponse = startDatafeedListener.actionGet(); assertNotNull(startDatafeedResponse); boolean invalidLicense = randomBoolean(); @@ -402,9 +403,9 @@ public void testMachineLearningCloseJobActionNotRestricted() throws Exception { client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(createJob(jobId)), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); - PlainActionFuture openJobListener = PlainActionFuture.newFuture(); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(jobId), openJobListener); - AcknowledgedResponse openJobResponse = openJobListener.actionGet(); + NodeAcknowledgedResponse openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); boolean invalidLicense = randomBoolean(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index c566bfed6284c..af6f349930193 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -143,6 +143,7 @@ ml.open_job: job_id: job-model-memory-limit-as-string - match: { opened: true } + - match: { node: "" } - do: headers: @@ -563,6 +564,7 @@ ml.open_job: job_id: delete-opened-job - match: { opened: true } + - match: { node: /\S+/ } - do: catch: /Cannot delete job \[delete-opened-job\] because the job is opened/ @@ -1446,6 +1448,7 @@ ml.open_job: job_id: persistent-task-allocation-allowed-test - match: { opened: true } + - match: { node: /\S+/ } --- "Test reopen job resets the finished time": diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index 8722bab94e216..e1fc8a7f520f0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -365,14 +365,17 @@ setup: ml.start_datafeed: datafeed_id: "start-stop-datafeed-job-foo-1-feed" - match: { started: true } + - match: { node: /\S+/ } - do: ml.start_datafeed: datafeed_id: "start-stop-datafeed-job-foo-2-feed" - match: { started: true } + - match: { node: /\S+/ } - do: ml.start_datafeed: datafeed_id: "start-stop-datafeed-job-bar-1-feed" - match: { started: true } + - match: { node: /\S+/ } - do: ml.stop_datafeed: