From bde4f40da6f084c9dee49969da0333efd4d66a73 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 31 Mar 2021 07:17:53 -0400 Subject: [PATCH] [ML] fixing feature reset integration tests (#71081) previously created pipelines referencing ML models were not being appropriately deleted in upstream tests. This commit ensures that machine learning removes relevant pipelines from cluster state after tests complete closes #71072 --- .../client/MachineLearningGetResultsIT.java | 2 +- .../client/MachineLearningIT.java | 2 +- .../client/MlTestStateCleaner.java | 61 ++++++++++++++++++- .../MlClientDocumentationIT.java | 2 +- .../xpack/core/ml/MlMetadata.java | 1 + .../integration/MlRestTestStateCleaner.java | 20 ++++++ .../ml/integration/TestFeatureResetIT.java | 17 ++++++ .../xpack/ml/MlMetadataTests.java | 8 +++ 8 files changed, 108 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningGetResultsIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningGetResultsIT.java index db7d9632a9c61..2612b4d976b11 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningGetResultsIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningGetResultsIT.java @@ -193,7 +193,7 @@ private void addModelSnapshotIndexRequests(BulkRequest bulkRequest) { @After public void deleteJob() throws IOException { - new MlTestStateCleaner(logger, highLevelClient().machineLearning()).clearMlMetadata(); + new MlTestStateCleaner(logger, highLevelClient()).clearMlMetadata(); } public void testGetModelSnapshots() throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 73c16c71e4a9f..739f4d241a858 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -225,7 +225,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { @After public void cleanUp() throws IOException { - new MlTestStateCleaner(logger, highLevelClient().machineLearning()).clearMlMetadata(); + new MlTestStateCleaner(logger, highLevelClient()).clearMlMetadata(); } public void testPutJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java index 4422b1cf4032e..e6ddcaef374d0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java @@ -8,16 +8,22 @@ package org.elasticsearch.client; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.client.core.PageParams; import org.elasticsearch.client.ml.CloseJobRequest; import org.elasticsearch.client.ml.DeleteDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteTrainedModelRequest; import org.elasticsearch.client.ml.GetDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.GetDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.GetDatafeedRequest; import org.elasticsearch.client.ml.GetDatafeedResponse; import org.elasticsearch.client.ml.GetJobRequest; import org.elasticsearch.client.ml.GetJobResponse; +import org.elasticsearch.client.ml.GetTrainedModelsRequest; +import org.elasticsearch.client.ml.GetTrainedModelsStatsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.datafeed.DatafeedConfig; @@ -25,26 +31,77 @@ import org.elasticsearch.client.ml.job.config.Job; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Cleans up and ML resources created during tests */ public class MlTestStateCleaner { + private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; private final MachineLearningClient mlClient; + private final RestHighLevelClient client; - public MlTestStateCleaner(Logger logger, MachineLearningClient mlClient) { + public MlTestStateCleaner(Logger logger, RestHighLevelClient client) { this.logger = logger; - this.mlClient = mlClient; + this.mlClient = client.machineLearning(); + this.client = client; } public void clearMlMetadata() throws IOException { + deleteAllTrainedModels(); deleteAllDatafeeds(); deleteAllJobs(); deleteAllDataFrameAnalytics(); } + @SuppressWarnings("unchecked") + private void deleteAllTrainedModels() throws IOException { + Set pipelinesWithModels = mlClient.getTrainedModelsStats( + new GetTrainedModelsStatsRequest("_all").setPageParams(new PageParams(0, 10_000)), RequestOptions.DEFAULT + ).getTrainedModelStats() + .stream() + .flatMap(stats -> { + Map ingestStats = stats.getIngestStats(); + if (ingestStats == null || ingestStats.isEmpty()) { + return Stream.empty(); + } + Map pipelines = (Map)ingestStats.get("pipelines"); + if (pipelines == null || pipelines.isEmpty()) { + return Stream.empty(); + } + return pipelines.keySet().stream(); + }) + .collect(Collectors.toSet()); + for (String pipelineId : pipelinesWithModels) { + try { + client.ingest().deletePipeline(new DeletePipelineRequest(pipelineId), RequestOptions.DEFAULT); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); + } + } + + mlClient.getTrainedModels( + GetTrainedModelsRequest.getAllTrainedModelConfigsRequest().setPageParams(new PageParams(0, 10_000)), + RequestOptions.DEFAULT) + .getTrainedModels() + .stream() + .filter(trainedModelConfig -> NOT_DELETED_TRAINED_MODELS.contains(trainedModelConfig.getModelId()) == false) + .forEach(config -> { + try { + mlClient.deleteTrainedModel(new DeleteTrainedModelRequest(config.getModelId()), RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + }); + } + private void deleteAllDatafeeds() throws IOException { stopAllDatafeeds(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 4a623f751ceb3..cdd457403119e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -242,7 +242,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { @After public void cleanUp() throws IOException { - new MlTestStateCleaner(logger, highLevelClient().machineLearning()).clearMlMetadata(); + new MlTestStateCleaner(logger, highLevelClient()).clearMlMetadata(); } public void testCreateJob() throws Exception { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index ff94db4856852..07ae6504814e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -323,6 +323,7 @@ public Builder(@Nullable MlMetadata previous) { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); upgradeMode = previous.upgradeMode; + resetMode = previous.resetMode; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java index 5a74298350f2c..bd69fed2febf5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.integration; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -18,6 +19,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + public class MlRestTestStateCleaner { @@ -40,6 +43,23 @@ public void clearMlMetadata() throws IOException { @SuppressWarnings("unchecked") private void deleteAllTrainedModels() throws IOException { + final Request getAllTrainedModelStats = new Request("GET", "/_ml/trained_models/_stats"); + getAllTrainedModelStats.addParameter("size", "10000"); + final Response trainedModelsStatsResponse = adminClient.performRequest(getAllTrainedModelStats); + + final List> pipelines = (List>) XContentMapValues.extractValue( + "trained_model_stats.ingest.pipelines", + ESRestTestCase.entityAsMap(trainedModelsStatsResponse) + ); + Set pipelineIds = pipelines.stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet()); + for (String pipelineId : pipelineIds) { + try { + adminClient.performRequest(new Request("DELETE", "/_ingest/pipeline/" + pipelineId)); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); + } + } + final Request getTrainedModels = new Request("GET", "/_ml/trained_models"); getTrainedModels.addParameter("size", "10000"); final Response trainedModelsResponse = adminClient.performRequest(getTrainedModels); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java index 39d09ae73354f..875b20f4af752 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; import org.elasticsearch.action.ingest.DeletePipelineAction; @@ -28,6 +29,8 @@ import org.junit.After; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor.Factory.countNumberInferenceProcessors; @@ -44,19 +47,31 @@ public class TestFeatureResetIT extends MlNativeAutodetectIntegTestCase { + private final Set createdPipelines = new HashSet<>(); + @After public void cleanup() throws Exception { cleanUp(); + for (String pipeline : createdPipelines) { + try { + client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest(pipeline)).actionGet(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("error cleaning up pipeline [{}]", pipeline), ex); + } + } } public void testMLFeatureReset() throws Exception { startRealtime("feature_reset_anomaly_job"); startDataFrameJob("feature_reset_data_frame_analytics_job"); putTrainedModelIngestPipeline("feature_reset_inference_pipeline"); + createdPipelines.add("feature_reset_inference_pipeline"); for(int i = 0; i < 100; i ++) { indexDocForInference("feature_reset_inference_pipeline"); } client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_inference_pipeline")).actionGet(); + createdPipelines.remove("feature_reset_inference_pipeline"); + assertBusy(() -> assertThat(countNumberInferenceProcessors(client().admin().cluster().prepareState().get().getState()), equalTo(0)) ); @@ -70,6 +85,7 @@ public void testMLFeatureReset() throws Exception { public void testMLFeatureResetFailureDueToPipelines() throws Exception { putTrainedModelIngestPipeline("feature_reset_failure_inference_pipeline"); + createdPipelines.add("feature_reset_failure_inference_pipeline"); Exception ex = expectThrows(Exception.class, () -> client().execute( ResetFeatureStateAction.INSTANCE, new ResetFeatureStateRequest() @@ -81,6 +97,7 @@ public void testMLFeatureResetFailureDueToPipelines() throws Exception { ) ); client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")).actionGet(); + createdPipelines.remove("feature_reset_failure_inference_pipeline"); assertThat(isResetMode(), is(false)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 0b2703033aae0..f887c6e79f1cf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -81,6 +81,14 @@ protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(searchModule.getNamedXContents()); } + public void testBuilderClone() { + for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + MlMetadata first = createTestInstance(); + MlMetadata cloned = MlMetadata.Builder.from(first).build(); + assertThat(cloned, equalTo(first)); + } + } + public void testPutJob() { Job job1 = buildJobBuilder("1").build(); Job job2 = buildJobBuilder("2").build();