From 9b7670d022e5878240f25d175d7d9d5fa44177fd Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Mon, 29 Mar 2021 14:51:34 -0400 Subject: [PATCH 1/4] [ML] complete ml plugin feature state clean up integration --- .../common/notifications/AbstractAuditor.java | 6 +- .../xpack/core/ml/MlMetadata.java | 58 +++++- .../xpack/core/ml/action/CloseJobAction.java | 18 +- .../core/ml/action/SetResetModeAction.java | 101 +++++++++++ .../action/StopDataFrameAnalyticsAction.java | 9 +- .../core/ml/action/StopDatafeedAction.java | 9 +- .../SetResetModeActionRequestTests.java | 35 ++++ .../ml/integration/ClassificationIT.java | 28 +-- ...NativeDataFrameAnalyticsIntegTestCase.java | 4 +- .../ml/integration/TestFeatureResetIT.java | 167 ++++++++++++++++++ .../xpack/ml/MachineLearning.java | 125 ++++++++++--- .../xpack/ml/MlDailyMaintenanceService.java | 4 + .../action/TransportSetResetModeAction.java | 113 ++++++++++++ .../inference/ingest/InferenceProcessor.java | 14 +- .../ml/notifications/AbstractMlAuditor.java | 58 ++++++ .../AnomalyDetectionAuditor.java | 13 +- .../DataFrameAnalyticsAuditor.java | 13 +- .../ml/notifications/InferenceAuditor.java | 11 +- .../xpack/ml/LocalStateMachineLearning.java | 22 ++- .../xpack/ml/MlMetadataTests.java | 12 +- 20 files changed, 718 insertions(+), 102 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java index 16de5d6fcadfc..c89a5be82793e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java @@ -184,7 +184,11 @@ private XContentBuilder toXContentBuilder(ToXContent toXContent) { } } - private void writeBacklog() { + protected void clearBacklog() { + backlog = null; + } + + protected void writeBacklog() { assert backlog != null; if (backlog == null) { logger.error("Message back log has already been written"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 3000942c2c81a..c17d38bf351f7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -50,8 +50,14 @@ public class MlMetadata implements Metadata.Custom { private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); public static final ParseField UPGRADE_MODE = new ParseField("upgrade_mode"); - - public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), false); + public static final ParseField RESET_MODE = new ParseField("reset_mode"); + + public static final MlMetadata EMPTY_METADATA = new MlMetadata( + Collections.emptySortedMap(), + Collections.emptySortedMap(), + false, + false + ); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new); @@ -60,19 +66,21 @@ public class MlMetadata implements Metadata.Custom { LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); LENIENT_PARSER.declareBoolean(Builder::isUpgradeMode, UPGRADE_MODE); - + LENIENT_PARSER.declareBoolean(Builder::isResetMode, RESET_MODE); } private final SortedMap jobs; private final SortedMap datafeeds; private final boolean upgradeMode; + private final boolean resetMode; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds, boolean upgradeMode) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds, boolean upgradeMode, boolean resetMode) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); this.upgradeMode = upgradeMode; + this.resetMode = resetMode; } public Map getJobs() { @@ -104,6 +112,10 @@ public boolean isUpgradeMode() { return upgradeMode; } + public boolean isResetMode() { + return resetMode; + } + @Override public Version getMinimalSupportedVersion() { return Version.CURRENT.minimumIndexCompatibilityVersion(); @@ -139,6 +151,11 @@ public MlMetadata(StreamInput in) throws IOException { this.datafeeds = datafeeds; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); this.upgradeMode = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.resetMode = in.readBoolean(); + } else { + this.resetMode = false; + } } @Override @@ -146,6 +163,9 @@ public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); out.writeBoolean(upgradeMode); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(resetMode); + } } private static void writeMap(Map map, StreamOutput out) throws IOException { @@ -163,6 +183,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); builder.field(UPGRADE_MODE.getPreferredName(), upgradeMode); + builder.field(RESET_MODE.getPreferredName(), resetMode); return builder; } @@ -184,11 +205,13 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; final boolean upgradeMode; + final boolean resetMode; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); this.upgradeMode = after.upgradeMode; + this.resetMode = after.resetMode; } public MlMetadataDiff(StreamInput in) throws IOException { @@ -197,6 +220,11 @@ public MlMetadataDiff(StreamInput in) throws IOException { this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readDatafeedDiffFrom); upgradeMode = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + resetMode = in.readBoolean(); + } else { + resetMode = false; + } } /** @@ -208,7 +236,7 @@ public MlMetadataDiff(StreamInput in) throws IOException { public Metadata.Custom apply(Metadata.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - return new MlMetadata(newJobs, newDatafeeds, upgradeMode); + return new MlMetadata(newJobs, newDatafeeds, upgradeMode, resetMode); } @Override @@ -216,6 +244,9 @@ public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); out.writeBoolean(upgradeMode); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(resetMode); + } } @Override @@ -241,7 +272,8 @@ public boolean equals(Object o) { MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && Objects.equals(datafeeds, that.datafeeds) && - Objects.equals(upgradeMode, that.upgradeMode); + upgradeMode == that.upgradeMode && + resetMode == that.resetMode; } @Override @@ -251,7 +283,7 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds, upgradeMode); + return Objects.hash(jobs, datafeeds, upgradeMode, resetMode); } public static class Builder { @@ -259,6 +291,11 @@ public static class Builder { private TreeMap jobs; private TreeMap datafeeds; private boolean upgradeMode; + private boolean resetMode; + + public static Builder from(@Nullable MlMetadata previous) { + return new Builder(previous); + } public Builder() { jobs = new TreeMap<>(); @@ -340,8 +377,13 @@ public Builder isUpgradeMode(boolean upgradeMode) { return this; } + public Builder isResetMode(boolean resetMode) { + this.resetMode = resetMode; + return this; + } + public MlMetadata build() { - return new MlMetadata(jobs, datafeeds, upgradeMode); + return new MlMetadata(jobs, datafeeds, upgradeMode, resetMode); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java index 76f7965a87e06..94388e037cfc5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java @@ -104,44 +104,50 @@ public String getJobId() { return jobId; } - public void setJobId(String jobId) { + public Request setJobId(String jobId) { this.jobId = jobId; + return this; } public TimeValue getCloseTimeout() { return timeout; } - public void setCloseTimeout(TimeValue timeout) { + public Request setCloseTimeout(TimeValue timeout) { this.timeout = timeout; + return this; } public boolean isForce() { return force; } - public void setForce(boolean force) { + public Request setForce(boolean force) { this.force = force; + return this; } public boolean allowNoMatch() { return allowNoMatch; } - public void setAllowNoMatch(boolean allowNoMatch) { + public Request setAllowNoMatch(boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; + return this; } public boolean isLocal() { return local; } - public void setLocal(boolean local) { + public Request setLocal(boolean local) { this.local = local; + return this; } public String[] getOpenJobIds() { return openJobIds; } - public void setOpenJobIds(String [] openJobIds) { + public Request setOpenJobIds(String[] openJobIds) { this.openJobIds = openJobIds; + return this; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java new file mode 100644 index 0000000000000..e5629bb550837 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class SetResetModeAction extends ActionType { + + public static final SetResetModeAction INSTANCE = new SetResetModeAction(); + public static final String NAME = "cluster:admin/xpack/ml/reset_mode"; + + private SetResetModeAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + public static Request enabled() { + return new Request(true); + } + + public static Request disabled() { + return new Request(false); + } + + private final boolean enabled; + + private static final ParseField ENABLED = new ParseField("enabled"); + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + } + + Request(boolean enabled) { + this.enabled = enabled; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.enabled = in.readBoolean(); + } + + public boolean isEnabled() { + return enabled; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(enabled); + } + + @Override + public int hashCode() { + return Objects.hash(enabled); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(enabled, other.enabled); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED.getPreferredName(), enabled); + builder.endObject(); + return builder; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java index 2d9ec8a38a899..849c066a3f68f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java @@ -93,8 +93,9 @@ public Request() { setTimeout(DEFAULT_TIMEOUT); } - public final void setId(String id) { + public final Request setId(String id) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID); + return this; } public String getId() { @@ -105,16 +106,18 @@ public boolean allowNoMatch() { return allowNoMatch; } - public void setAllowNoMatch(boolean allowNoMatch) { + public Request setAllowNoMatch(boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; + return this; } public boolean isForce() { return force; } - public void setForce(boolean force) { + public Request setForce(boolean force) { this.force = force; + return this; } @Nullable diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java index 578bbe39bc38c..410193266956c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java @@ -104,24 +104,27 @@ public TimeValue getStopTimeout() { return stopTimeout; } - public void setStopTimeout(TimeValue stopTimeout) { + public Request setStopTimeout(TimeValue stopTimeout) { this.stopTimeout = ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName()); + return this; } public boolean isForce() { return force; } - public void setForce(boolean force) { + public Request setForce(boolean force) { this.force = force; + return this; } public boolean allowNoMatch() { return allowNoMatch; } - public void setAllowNoMatch(boolean allowNoMatch) { + public Request setAllowNoMatch(boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; + return this; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java new file mode 100644 index 0000000000000..f1dbf1e970ef9 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.SetResetModeAction.Request; + +public class SetResetModeActionRequestTests extends AbstractSerializingTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomBoolean()); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request doParseInstance(XContentParser parser) { + return Request.PARSER.apply(parser, null); + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index e842bef886f72..93fd5226b519c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -84,18 +84,18 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { - private static final String BOOLEAN_FIELD = "boolean-field"; - private static final String NUMERICAL_FIELD = "numerical-field"; - private static final String DISCRETE_NUMERICAL_FIELD = "discrete-numerical-field"; - private static final String TEXT_FIELD = "text-field"; - private static final String KEYWORD_FIELD = "keyword-field"; - private static final String NESTED_FIELD = "outer-field.inner-field"; - private static final String ALIAS_TO_KEYWORD_FIELD = "alias-to-keyword-field"; - private static final String ALIAS_TO_NESTED_FIELD = "alias-to-nested-field"; - private static final List BOOLEAN_FIELD_VALUES = List.of(false, true); - private static final List NUMERICAL_FIELD_VALUES = List.of(1.0, 2.0); - private static final List DISCRETE_NUMERICAL_FIELD_VALUES = List.of(10, 20); - private static final List KEYWORD_FIELD_VALUES = List.of("cat", "dog"); + static final String BOOLEAN_FIELD = "boolean-field"; + static final String NUMERICAL_FIELD = "numerical-field"; + static final String DISCRETE_NUMERICAL_FIELD = "discrete-numerical-field"; + static final String TEXT_FIELD = "text-field"; + static final String KEYWORD_FIELD = "keyword-field"; + static final String NESTED_FIELD = "outer-field.inner-field"; + static final String ALIAS_TO_KEYWORD_FIELD = "alias-to-keyword-field"; + static final String ALIAS_TO_NESTED_FIELD = "alias-to-nested-field"; + static final List BOOLEAN_FIELD_VALUES = List.of(false, true); + static final List NUMERICAL_FIELD_VALUES = List.of(1.0, 2.0); + static final List DISCRETE_NUMERICAL_FIELD_VALUES = List.of(10, 20); + static final List KEYWORD_FIELD_VALUES = List.of("cat", "dog"); private String jobId; private String sourceIndex; @@ -957,7 +957,7 @@ private void initialize(String jobId, boolean isDatastream) { } } - private static void createIndex(String index, boolean isDatastream) { + static void createIndex(String index, boolean isDatastream) { String mapping = "{\n" + " \"properties\": {\n" + " \"@timestamp\": {\n" + @@ -1009,7 +1009,7 @@ private static void createIndex(String index, boolean isDatastream) { } } - private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) { + static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) { BulkRequestBuilder bulkRequestBuilder = client().prepareBulk() .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < numTrainingRows; i++) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index ad20418def610..c7db6c962e8c1 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -200,8 +200,8 @@ protected PreviewDataFrameAnalyticsAction.Response previewDataFrame(String id) { ).actionGet(); } - protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex, - @Nullable String resultsField, DataFrameAnalysis analysis) throws Exception { + static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex, + @Nullable String resultsField, DataFrameAnalysis analysis) throws Exception { return buildAnalytics(id, sourceIndex, destIndex, resultsField, analysis, QueryBuilders.matchAllQuery()); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java new file mode 100644 index 0000000000000..999bb9b652a74 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; +import org.elasticsearch.action.ingest.DeletePipelineAction; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.junit.After; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.ml.integration.ClassificationIT.KEYWORD_FIELD; +import static org.elasticsearch.xpack.ml.integration.MlNativeDataFrameAnalyticsIntegTestCase.buildAnalytics; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TestFeatureResetIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanup() throws Exception { + cleanUp(); + } + + public void testMLFeatureReset() throws Exception { + startRealtime("feature_reset_anomaly_job"); + startDataFrameJob("feature_reset_data_frame_analytics_job"); + putTrainedModelIngestPipeline("feature_reset_inference_pipeline"); + for(int i = 0; i < 100; i ++) { + indexDocForInference("feature_reset_inference_pipeline"); + } + client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_inference_pipeline")).actionGet(); + client().execute( + ResetFeatureStateAction.INSTANCE, + new ResetFeatureStateRequest() + ).actionGet(); + assertBusy(() -> assertThat(client().admin().indices().prepareGetIndex().addIndices(".ml*").get().indices(), emptyArray())); + } + + public void testMLFeatureResetFailureDueToPipelines() throws Exception { + putTrainedModelIngestPipeline("feature_reset_failure_inference_pipeline"); + Exception ex = expectThrows(Exception.class, () -> client().execute( + ResetFeatureStateAction.INSTANCE, + new ResetFeatureStateRequest() + ).actionGet()); + assertThat( + ex.getMessage(), + containsString( + "Unable to reset component as there are ingest pipelines still referencing trained machine learning models" + ) + ); + client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")).actionGet(); + } + + private void startDataFrameJob(String jobId) throws Exception { + String sourceIndex = jobId + "-src"; + String destIndex = jobId + "-dest"; + ClassificationIT.createIndex(sourceIndex, false); + ClassificationIT.indexData(sourceIndex, 300, 50, KEYWORD_FIELD); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, + new Classification( + KEYWORD_FIELD, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + null, + null, + null, + null, + null, + null, + null)); + PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(config); + client().execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet(); + + client().execute(StartDataFrameAnalyticsAction.INSTANCE, new StartDataFrameAnalyticsAction.Request(jobId)); + } + + private void startRealtime(String jobId) throws Exception { + client().admin().indices().prepareCreate("data") + .setMapping("time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long lastWeek = now - 604800000; + indexDocs(logger, "data", numDocs1, lastWeek, now); + + Job.Builder job = createScheduledJob(jobId); + registerJob(job); + putJob(job); + openJob(job.getId()); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, null); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1))); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L))); + }); + + long numDocs2 = randomIntBetween(2, 64); + now = System.currentTimeMillis(); + indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1 + numDocs2))); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L))); + }, 30, TimeUnit.SECONDS); + } + + private void putTrainedModelIngestPipeline(String pipelineId) throws Exception { + client().execute( + PutPipelineAction.INSTANCE, + new PutPipelineRequest( + pipelineId, + new BytesArray( + "{\n" + + " \"processors\": [\n" + + " {\n" + + " \"inference\": {\n" + + " \"inference_config\": {\"classification\":{}},\n" + + " \"model_id\": \"lang_ident_model_1\",\n" + + " \"field_map\": {}\n" + + " }\n" + + " }\n" + + " ]\n" + + " }" + ), + XContentType.JSON + ) + ).actionGet(); + } + + private void indexDocForInference(String pipelineId) { + client().prepareIndex("foo") + .setPipeline(pipelineId) + .setSource("{\"text\": \"this is some plain text.\"}", XContentType.JSON) + .get(); + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9b36d75daedee..39f71d794fb2e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -13,8 +13,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse; import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.node.NodeClient; @@ -136,6 +138,7 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -216,6 +219,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; @@ -376,6 +380,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX; import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX; +import static org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor.Factory.countNumberInferenceProcessors; public class MachineLearning extends Plugin implements SystemIndexPlugin, AnalysisPlugin, @@ -1039,6 +1044,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class), new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class), new ActionHandler<>(PreviewDataFrameAnalyticsAction.INSTANCE, TransportPreviewDataFrameAnalyticsAction.class), + new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class), usageAction, infoAction); } @@ -1249,28 +1255,80 @@ public String getFeatureDescription() { return "Provides anomaly detection and forecasting functionality"; } - @Override public void cleanUpFeature( + @Override + public void cleanUpFeature( ClusterService clusterService, Client client, - ActionListener listener) { + ActionListener finalListener) { + logger.info("Starting machine learning cleanup"); + + ActionListener unsetResetModeListener = ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( + resetSuccess -> finalListener.onResponse(success), + resetFailure -> { + logger.warn("failed to disable reset mode after state clean up success", resetFailure); + finalListener.onResponse(success); + }) + ), + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( + resetSuccess -> finalListener.onFailure(failure), + resetFailure -> { + logger.warn("failed to disable reset mode after state clean up failure", resetFailure); + finalListener.onFailure(failure); + }) + ) + ); Map results = new ConcurrentHashMap<>(); + ActionListener afterWaitingForTasks = ActionListener.wrap( + listTasksResponse -> { + listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); + if (results.values().stream().allMatch(b -> b)) { + // Call into the original listener to clean up the indices + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); + } else { + final List failedComponents = results.entrySet().stream() + .filter(result -> result.getValue() == false) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); + } + }, + unsetResetModeListener::onFailure + ); + ActionListener afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> { // Handle the response results.put("data_frame/analytics", dataFrameStopResponse.isStopped()); - if (results.values().stream().allMatch(b -> b)) { - // Call into the original listener to clean up the indices - SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener); + client.admin() + .cluster() + .prepareListTasks() + .setActions("xpack/ml/*") + .setWaitForCompletion(true) + .execute(ActionListener.wrap( + listMlTasks -> { + listMlTasks.rethrowFailures("Waiting for machine learning tasks"); + client.admin() + .cluster() + .prepareListTasks() + .setActions("indices:data/write/bulk") + .setDetailed(true) + .setWaitForCompletion(true) + .setDescriptions("*.ml-*") + .execute(afterWaitingForTasks); + }, + unsetResetModeListener::onFailure + )); } else { final List failedComponents = results.entrySet().stream() .filter(result -> result.getValue() == false) .map(Map.Entry::getKey) .collect(Collectors.toList()); - listener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); + unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); } - }, listener::onFailure); + }, unsetResetModeListener::onFailure); ActionListener afterAnomalyDetectionClosed = ActionListener.wrap(closeJobResponse -> { @@ -1278,11 +1336,11 @@ public String getFeatureDescription() { results.put("anomaly_detectors", closeJobResponse.isClosed()); // Stop data frame analytics - StopDataFrameAnalyticsAction.Request stopDataFramesReq = new StopDataFrameAnalyticsAction.Request("_all"); - stopDataFramesReq.setForce(true); - stopDataFramesReq.setAllowNoMatch(true); + StopDataFrameAnalyticsAction.Request stopDataFramesReq = new StopDataFrameAnalyticsAction.Request("_all") + .setForce(true) + .setAllowNoMatch(true); client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq, afterDataframesStopped); - }, listener::onFailure); + }, unsetResetModeListener::onFailure); // Close anomaly detection jobs ActionListener afterDataFeedsStopped = ActionListener.wrap(datafeedResponse -> { @@ -1290,19 +1348,44 @@ public String getFeatureDescription() { results.put("datafeeds", datafeedResponse.isStopped()); // Close anomaly detection jobs - CloseJobAction.Request closeJobsRequest = new CloseJobAction.Request(); - closeJobsRequest.setForce(true); - closeJobsRequest.setAllowNoMatch(true); - closeJobsRequest.setJobId("_all"); + CloseJobAction.Request closeJobsRequest = new CloseJobAction.Request() + .setForce(true) + .setAllowNoMatch(true) + .setJobId("_all"); client.execute(CloseJobAction.INSTANCE, closeJobsRequest, afterAnomalyDetectionClosed); - }, listener::onFailure); + }, unsetResetModeListener::onFailure); // Stop data feeds - StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all"); - stopDatafeedsReq.setAllowNoMatch(true); - stopDatafeedsReq.setForce(true); - client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq, - afterDataFeedsStopped); + ActionListener pipelineValidation = ActionListener.wrap( + acknowledgedResponse -> { + StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all") + .setAllowNoMatch(true) + .setForce(true); + client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq, + afterDataFeedsStopped); + }, + unsetResetModeListener::onFailure + ); + + // validate no pipelines are using machine learning models + ActionListener afterResetModeSet = ActionListener.wrap( + acknowledgedResponse -> { + int numberInferenceProcessors = countNumberInferenceProcessors(clusterService.state()); + if (numberInferenceProcessors > 0) { + unsetResetModeListener.onFailure( + new RuntimeException( + "Unable to reset component as there are ingest pipelines still referencing trained machine learning models" + ) + ); + return; + } + pipelineValidation.onResponse(AcknowledgedResponse.of(true)); + }, + finalListener::onFailure + ); + + // Indicate that a reset is now in progress + client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.enabled(), afterResetModeSet); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 1e0be7a65c6e2..623875ff0fded 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -151,6 +151,10 @@ private void triggerTasks() { LOGGER.warn("skipping scheduled [ML] maintenance tasks because upgrade mode is enabled"); return; } + if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) { + LOGGER.warn("skipping scheduled [ML] maintenance tasks because reset mode is enabled"); + return; + } LOGGER.info("triggering scheduled [ML] maintenance tasks"); // Step 3: Log any error that could have happened diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java new file mode 100644 index 0000000000000..07417dfdd59c2 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; + + +public class TransportSetResetModeAction extends AcknowledgedTransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportSetResetModeAction.class); + private final ClusterService clusterService; + + @Inject + public TransportSetResetModeAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(SetResetModeAction.NAME, transportService, clusterService, threadPool, actionFilters, SetResetModeAction.Request::new, + indexNameExpressionResolver, ThreadPool.Names.SAME); + this.clusterService = clusterService; + } + + @Override + protected void masterOperation(Task task, + SetResetModeAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + + // Noop, nothing for us to do, simply return fast to the caller + if (request.isEnabled() == MlMetadata.getMlMetadata(state).isResetMode()) { + logger.debug("Reset mode noop"); + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + + logger.debug( + () -> new ParameterizedMessage( + "Starting to set [reset_mode] to [{}] from [{}]", request.isEnabled(), MlMetadata.getMlMetadata(state).isResetMode() + ) + ); + + ActionListener wrappedListener = ActionListener.wrap( + r -> { + logger.debug("Completed reset mode request"); + listener.onResponse(r); + }, + e -> { + logger.debug("Completed reset mode request but with failure", e); + listener.onFailure(e); + } + ); + + ActionListener clusterStateUpdateListener = ActionListener.wrap( + acknowledgedResponse -> { + if (acknowledgedResponse.isAcknowledged() == false) { + logger.info("Cluster state update is NOT acknowledged"); + wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state")); + return; + } + wrappedListener.onResponse(acknowledgedResponse); + }, + wrappedListener::onFailure + ); + + clusterService.submitStateUpdateTask("ml-set-reset-mode", + new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + logger.trace("Cluster update response built: " + acknowledged); + return AcknowledgedResponse.of(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + logger.trace("Executing cluster state update"); + MlMetadata.Builder builder = MlMetadata.Builder + .from(currentState.metadata().custom(MlMetadata.TYPE)) + .isResetMode(request.isEnabled()); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); + return newState.build(); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(SetResetModeAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java index 1e766d71e8daa..4cf4d453d0391 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java @@ -198,15 +198,17 @@ public Factory(Client client, ClusterService clusterService, Settings settings) @Override public void accept(ClusterState state) { minNodeVersion = state.nodes().getMinNodeVersion(); + currentInferenceProcessors = countNumberInferenceProcessors(state); + } + + public static int countNumberInferenceProcessors(ClusterState state) { Metadata metadata = state.getMetadata(); if (metadata == null) { - currentInferenceProcessors = 0; - return; + return 0; } IngestMetadata ingestMetadata = metadata.custom(IngestMetadata.TYPE); if (ingestMetadata == null) { - currentInferenceProcessors = 0; - return; + return 0; } int count = 0; @@ -219,14 +221,14 @@ public void accept(ClusterState state) { count += numInferenceProcessors(entry.getKey(), entry.getValue()); } } - // We cannot throw any exception here. It might break other pipelines. + // We cannot throw any exception here. It might break other pipelines. } catch (Exception ex) { logger.debug( () -> new ParameterizedMessage("failed gathering processors for pipeline [{}]", configuration.getId()), ex); } } - currentInferenceProcessors = count; + return count; } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java new file mode 100644 index 0000000000000..3072c02786462 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.notifications; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; +import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +abstract class AbstractMlAuditor extends AbstractAuditor { + + private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class); + private volatile boolean isResetMode; + + protected AbstractMlAuditor(Client client, AbstractAuditMessageFactory messageFactory, ClusterService clusterService) { + super( + new OriginSettingClient(client, ML_ORIGIN), + NotificationsIndex.NOTIFICATIONS_INDEX, + MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, + clusterService.getNodeName(), + messageFactory, + clusterService + ); + clusterService.addListener(event -> { + if (event.metadataChanged()) { + setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode()); + } + }); + } + + private void setResetMode(boolean value) { + isResetMode = value; + } + + @Override + protected void writeBacklog() { + if (isResetMode) { + logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); + clearBacklog(); + } else { + super.writeBacklog(); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java index c2981335fe391..5c3079dd6c5f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java @@ -7,21 +7,12 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; -import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage; -import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; - -public class AnomalyDetectionAuditor extends AbstractAuditor { +public class AnomalyDetectionAuditor extends AbstractMlAuditor { public AnomalyDetectionAuditor(Client client, ClusterService clusterService) { - super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, - MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, - clusterService.getNodeName(), - AnomalyDetectionAuditMessage::new, clusterService); + super(client, AnomalyDetectionAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java index de776154c6881..608f7876672d6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java @@ -7,21 +7,12 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; -import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage; -import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; - -public class DataFrameAnalyticsAuditor extends AbstractAuditor { +public class DataFrameAnalyticsAuditor extends AbstractMlAuditor { public DataFrameAnalyticsAuditor(Client client, ClusterService clusterService) { - super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, - MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, - clusterService.getNodeName(), - DataFrameAnalyticsAuditMessage::new, clusterService); + super(client, DataFrameAnalyticsAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java index 56851d1764526..fdcb86fd76400 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java @@ -7,19 +7,12 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; -import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.InferenceAuditMessage; -import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; - -public class InferenceAuditor extends AbstractAuditor { +public class InferenceAuditor extends AbstractMlAuditor { public InferenceAuditor(Client client, ClusterService clusterService) { - super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, - MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, clusterService.getNodeName(), InferenceAuditMessage::new, clusterService); + super(client, InferenceAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java index 0a56b653c02f5..750f36863a141 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java @@ -9,8 +9,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -35,18 +38,18 @@ public class LocalStateMachineLearning extends LocalStateCompositeXPackPlugin { - public LocalStateMachineLearning(final Settings settings, final Path configPath) throws Exception { + private final MachineLearning mlPlugin; + public LocalStateMachineLearning(final Settings settings, final Path configPath) { super(settings, configPath); LocalStateMachineLearning thisVar = this; - MachineLearning plugin = new MachineLearning(settings, configPath){ - + mlPlugin = new MachineLearning(settings, configPath){ @Override protected XPackLicenseState getLicenseState() { return thisVar.getLicenseState(); } }; - plugin.setCircuitBreaker(new NoopCircuitBreaker(TRAINED_MODEL_CIRCUIT_BREAKER_NAME)); - plugins.add(plugin); + mlPlugin.setCircuitBreaker(new NoopCircuitBreaker(TRAINED_MODEL_CIRCUIT_BREAKER_NAME)); + plugins.add(mlPlugin); plugins.add(new Monitoring(settings) { @Override protected SSLService getSslService() { @@ -73,6 +76,15 @@ protected XPackLicenseState getLicenseState() { plugins.add(new MockedRollupPlugin()); } + @Override + public void cleanUpFeature( + ClusterService clusterService, + Client client, + ActionListener finalListener) { + mlPlugin.cleanUpFeature(clusterService, client, finalListener); + } + + /** * This is only required as we now have to have the GetRollupIndexCapsAction as a valid action in our node. * The MachineLearningLicenseTests attempt to create a datafeed referencing this LocalStateMachineLearning object. diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 46c595c5c0d8e..8b9e1237ca789 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -56,7 +56,7 @@ protected MlMetadata createTestInstance() { builder.putJob(job, false); } } - return builder.build(); + return builder.isResetMode(randomBoolean()).isUpgradeMode(randomBoolean()).build(); } @Override @@ -146,6 +146,8 @@ private static MlMetadata.Builder newMlMetadataWithJobs(String... jobIds) { protected MlMetadata mutateInstance(MlMetadata instance) { Map jobs = instance.getJobs(); Map datafeeds = instance.getDatafeeds(); + boolean isUpgrade = instance.isUpgradeMode(); + boolean isReset = instance.isResetMode(); MlMetadata.Builder metadataBuilder = new MlMetadata.Builder(); for (Map.Entry entry : jobs.entrySet()) { @@ -155,7 +157,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap(), xContentRegistry()); } - switch (between(0, 1)) { + switch (between(0, 3)) { case 0: metadataBuilder.putJob(JobTests.createRandomizedJob(), true); break; @@ -175,6 +177,12 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap(), xContentRegistry()); break; + case 2: + metadataBuilder.isUpgradeMode(isUpgrade == false); + break; + case 3: + metadataBuilder.isResetMode(isReset == false); + break; default: throw new AssertionError("Illegal randomisation branch"); } From 212baeca3ba81b899a3d4e06886d8ec30719e89f Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 30 Mar 2021 07:13:27 -0400 Subject: [PATCH 2/4] addressing pr comments and fixing operator action test --- .../xpack/ml/action/TransportSetResetModeAction.java | 2 +- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java index 07417dfdd59c2..e001b31caf9c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -89,7 +89,7 @@ protected void masterOperation(Task task, @Override protected AcknowledgedResponse newResponse(boolean acknowledged) { - logger.trace("Cluster update response built: " + acknowledged); + logger.trace(() -> new ParameterizedMessage("Cluster update response built: {}", acknowledged)); return AcknowledgedResponse.of(acknowledged); } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c02d4aef1b465..d96a951bef203 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -156,6 +156,7 @@ public class Constants { "cluster:admin/xpack/ml/job/update", "cluster:admin/xpack/ml/job/validate", "cluster:admin/xpack/ml/job/validate/detector", + "cluster:admin/xpack/ml/reset_mode", "cluster:admin/xpack/ml/upgrade_mode", "cluster:admin/xpack/monitoring/bulk", "cluster:admin/xpack/monitoring/migrate/alerts", From 078e41e11acffe740b32c3fba5c31fd4ccf11ef7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 30 Mar 2021 10:11:46 -0400 Subject: [PATCH 3/4] addressing PR comments --- .../core/ml/action/SetResetModeAction.java | 2 +- .../ml/integration/TestFeatureResetIT.java | 9 +++++++ .../xpack/ml/MachineLearning.java | 25 +++++++++++++------ .../xpack/ml/MlDailyMaintenanceService.java | 2 +- .../xpack/security/operator/Constants.java | 2 +- 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java index e5629bb550837..ade9e2337d4a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java @@ -23,7 +23,7 @@ public class SetResetModeAction extends ActionType { public static final SetResetModeAction INSTANCE = new SetResetModeAction(); - public static final String NAME = "cluster:admin/xpack/ml/reset_mode"; + public static final String NAME = "cluster:internal/xpack/ml/reset_mode"; private SetResetModeAction() { super(NAME, AcknowledgedResponse::readFrom); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java index 999bb9b652a74..283b7e787aeae 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java @@ -12,8 +12,10 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineAction; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -59,6 +61,7 @@ public void testMLFeatureReset() throws Exception { new ResetFeatureStateRequest() ).actionGet(); assertBusy(() -> assertThat(client().admin().indices().prepareGetIndex().addIndices(".ml*").get().indices(), emptyArray())); + assertThat(isResetMode(), is(false)); } public void testMLFeatureResetFailureDueToPipelines() throws Exception { @@ -74,6 +77,12 @@ public void testMLFeatureResetFailureDueToPipelines() throws Exception { ) ); client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")).actionGet(); + assertThat(isResetMode(), is(false)); + } + + private boolean isResetMode() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + return MlMetadata.getMlMetadata(state).isResetMode(); } private void startDataFrameJob(String jobId) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 39f71d794fb2e..0f7421549a1af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -165,6 +165,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.template.TemplateUtils; import org.elasticsearch.xpack.ml.action.TransportCloseJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction; @@ -1260,20 +1261,25 @@ public void cleanUpFeature( ClusterService clusterService, Client client, ActionListener finalListener) { - logger.info("Starting machine learning cleanup"); + logger.info("Starting machine learning feature reset"); ActionListener unsetResetModeListener = ActionListener.wrap( success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { - logger.warn("failed to disable reset mode after state clean up success", resetFailure); - finalListener.onResponse(success); + logger.error("failed to disable reset mode after state otherwise successful machine learning reset", resetFailure); + finalListener.onFailure( + ExceptionsHelper.serverError( + "failed to disable reset mode after state otherwise successful machine learning reset", + resetFailure + ) + ); }) ), failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { - logger.warn("failed to disable reset mode after state clean up failure", resetFailure); + logger.error("failed to disable reset mode after state clean up failure", resetFailure); finalListener.onFailure(failure); }) ) @@ -1292,7 +1298,9 @@ public void cleanUpFeature( .filter(result -> result.getValue() == false) .map(Map.Entry::getKey) .collect(Collectors.toList()); - unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); + unsetResetModeListener.onFailure( + new RuntimeException("Some machine learning components failed to reset: " + failedComponents) + ); } }, unsetResetModeListener::onFailure @@ -1326,7 +1334,9 @@ public void cleanUpFeature( .filter(result -> result.getValue() == false) .map(Map.Entry::getKey) .collect(Collectors.toList()); - unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); + unsetResetModeListener.onFailure( + new RuntimeException("Some machine learning components failed to reset: " + failedComponents) + ); } }, unsetResetModeListener::onFailure); @@ -1374,7 +1384,8 @@ public void cleanUpFeature( if (numberInferenceProcessors > 0) { unsetResetModeListener.onFailure( new RuntimeException( - "Unable to reset component as there are ingest pipelines still referencing trained machine learning models" + "Unable to reset machine learning feature as there are ingest pipelines " + + "still referencing trained machine learning models" ) ); return; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 623875ff0fded..7c699af3945ba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -152,7 +152,7 @@ private void triggerTasks() { return; } if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) { - LOGGER.warn("skipping scheduled [ML] maintenance tasks because reset mode is enabled"); + LOGGER.warn("skipping scheduled [ML] maintenance tasks because machine learning feature reset is in progress"); return; } LOGGER.info("triggering scheduled [ML] maintenance tasks"); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index d96a951bef203..4b5240a52db61 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -156,7 +156,6 @@ public class Constants { "cluster:admin/xpack/ml/job/update", "cluster:admin/xpack/ml/job/validate", "cluster:admin/xpack/ml/job/validate/detector", - "cluster:admin/xpack/ml/reset_mode", "cluster:admin/xpack/ml/upgrade_mode", "cluster:admin/xpack/monitoring/bulk", "cluster:admin/xpack/monitoring/migrate/alerts", @@ -214,6 +213,7 @@ public class Constants { "cluster:internal/xpack/ml/job/finalize_job_execution", "cluster:internal/xpack/ml/job/kill/process", "cluster:internal/xpack/ml/job/update/process", + "cluster:internal/xpack/ml/reset_mode", "cluster:monitor/allocation/explain", "cluster:monitor/async_search/status", "cluster:monitor/ccr/follow_info", From 7dd2ba96a173e36d72aeadaba02d4cc9d27afbeb Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 30 Mar 2021 12:10:15 -0400 Subject: [PATCH 4/4] fixing test --- .../xpack/ml/integration/TestFeatureResetIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java index 283b7e787aeae..044673235c434 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor.Factory.countNumberInferenceProcessors; import static org.elasticsearch.xpack.ml.integration.ClassificationIT.KEYWORD_FIELD; import static org.elasticsearch.xpack.ml.integration.MlNativeDataFrameAnalyticsIntegTestCase.buildAnalytics; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; @@ -56,6 +57,9 @@ public void testMLFeatureReset() throws Exception { indexDocForInference("feature_reset_inference_pipeline"); } client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_inference_pipeline")).actionGet(); + assertBusy(() -> + assertThat(countNumberInferenceProcessors(client().admin().cluster().prepareState().get().getState()), equalTo(0)) + ); client().execute( ResetFeatureStateAction.INSTANCE, new ResetFeatureStateRequest() @@ -73,7 +77,7 @@ public void testMLFeatureResetFailureDueToPipelines() throws Exception { assertThat( ex.getMessage(), containsString( - "Unable to reset component as there are ingest pipelines still referencing trained machine learning models" + "Unable to reset machine learning feature as there are ingest pipelines still referencing trained machine learning models" ) ); client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")).actionGet();