From c4364584aa3c518f8f78fc6f03d9204b6e147091 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 13 Apr 2021 16:04:47 +0100 Subject: [PATCH] [ML] Use feature reset API in ML REST test cleanup (#71552) Now that we have a feature reset API, we should use this for cleaning up in between tests instead of running lots of bespoke cleanup code. During testing of this change we found we need to delete custom cluster state as part of the reset process, so this PR also implements that. Additionally we no longer assign persistent tasks during feature reset. --- .../client/MlTestStateCleaner.java | 126 +----------------- .../action/SetResetModeActionRequest.java | 29 +++- .../elasticsearch/xpack/core/ml/MlTasks.java | 3 + .../SetResetModeActionRequestTests.java | 3 +- .../integration/MlRestTestStateCleaner.java | 124 +---------------- .../qa/native-multi-node-tests/build.gradle | 1 + .../ml/integration/MlNativeIntegTestCase.java | 3 + .../ml/integration/AnnotationIndexIT.java | 2 +- .../xpack/ml/MachineLearning.java | 4 +- .../xpack/ml/MlUpgradeModeActionFilter.java | 65 +++++++-- .../action/TransportSetResetModeAction.java | 19 ++- .../ml/datafeed/DatafeedNodeSelector.java | 4 + .../AbstractJobPersistentTasksExecutor.java | 6 +- .../ml/MlUpgradeModeActionFilterTests.java | 39 ++++-- .../datafeed/DatafeedNodeSelectorTests.java | 20 +++ .../OpenJobPersistentTasksExecutorTests.java | 18 +++ .../integration/TestFeatureResetIT.java | 5 +- .../xpack/transform/Transform.java | 4 +- .../transform/action/TransformNodes.java | 11 +- .../TransportSetTransformResetModeAction.java | 18 ++- .../TransformPersistentTasksExecutor.java | 5 + 21 files changed, 220 insertions(+), 289 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java index e6ddcaef374d0..f7e42db2ee810 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java @@ -11,59 +11,37 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.client.core.PageParams; -import org.elasticsearch.client.ml.CloseJobRequest; -import org.elasticsearch.client.ml.DeleteDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.DeleteDatafeedRequest; -import org.elasticsearch.client.ml.DeleteJobRequest; -import org.elasticsearch.client.ml.DeleteTrainedModelRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsResponse; -import org.elasticsearch.client.ml.GetDatafeedRequest; -import org.elasticsearch.client.ml.GetDatafeedResponse; -import org.elasticsearch.client.ml.GetJobRequest; -import org.elasticsearch.client.ml.GetJobResponse; -import org.elasticsearch.client.ml.GetTrainedModelsRequest; +import org.elasticsearch.client.feature.ResetFeaturesRequest; import org.elasticsearch.client.ml.GetTrainedModelsStatsRequest; -import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.StopDatafeedRequest; -import org.elasticsearch.client.ml.datafeed.DatafeedConfig; -import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.client.ml.job.config.Job; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Cleans up and ML resources created during tests + * Cleans up ML resources created during tests */ public class MlTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; - private final MachineLearningClient mlClient; private final RestHighLevelClient client; public MlTestStateCleaner(Logger logger, RestHighLevelClient client) { this.logger = logger; - this.mlClient = client.machineLearning(); this.client = client; } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + client.features().resetFeatures(new ResetFeaturesRequest(), RequestOptions.DEFAULT); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { - Set pipelinesWithModels = mlClient.getTrainedModelsStats( + private void deleteAllTrainedModelIngestPipelines() throws IOException { + Set pipelinesWithModels = client.machineLearning().getTrainedModelsStats( new GetTrainedModelsStatsRequest("_all").setPageParams(new PageParams(0, 10_000)), RequestOptions.DEFAULT ).getTrainedModelStats() .stream() @@ -86,95 +64,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - mlClient.getTrainedModels( - GetTrainedModelsRequest.getAllTrainedModelConfigsRequest().setPageParams(new PageParams(0, 10_000)), - RequestOptions.DEFAULT) - .getTrainedModels() - .stream() - .filter(trainedModelConfig -> NOT_DELETED_TRAINED_MODELS.contains(trainedModelConfig.getModelId()) == false) - .forEach(config -> { - try { - mlClient.deleteTrainedModel(new DeleteTrainedModelRequest(config.getModelId()), RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - }); - } - - private void deleteAllDatafeeds() throws IOException { - stopAllDatafeeds(); - - GetDatafeedResponse getDatafeedResponse = mlClient.getDatafeed(GetDatafeedRequest.getAllDatafeedsRequest(), RequestOptions.DEFAULT); - for (DatafeedConfig datafeed : getDatafeedResponse.datafeeds()) { - mlClient.deleteDatafeed(new DeleteDatafeedRequest(datafeed.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDatafeeds() { - StopDatafeedRequest stopAllDatafeedsRequest = StopDatafeedRequest.stopAllDatafeedsRequest(); - try { - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - stopAllDatafeedsRequest.setForce(true); - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - } - - private void deleteAllJobs() throws IOException { - closeAllJobs(); - - GetJobResponse getJobResponse = mlClient.getJob(GetJobRequest.getAllJobsRequest(), RequestOptions.DEFAULT); - for (Job job : getJobResponse.jobs()) { - mlClient.deleteJob(new DeleteJobRequest(job.getId()), RequestOptions.DEFAULT); - } - } - - private void closeAllJobs() { - CloseJobRequest closeAllJobsRequest = CloseJobRequest.closeAllJobsRequest(); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - closeAllJobsRequest.setForce(true); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", e1); - } - } - - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - GetDataFrameAnalyticsResponse getDataFrameAnalyticsResponse = - mlClient.getDataFrameAnalytics(GetDataFrameAnalyticsRequest.getAllDataFrameAnalyticsRequest(), RequestOptions.DEFAULT); - for (DataFrameAnalyticsConfig config : getDataFrameAnalyticsResponse.getAnalytics()) { - mlClient.deleteDataFrameAnalytics(new DeleteDataFrameAnalyticsRequest(config.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDataFrameAnalytics() { - StopDataFrameAnalyticsRequest stopAllRequest = new StopDataFrameAnalyticsRequest("*"); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - stopAllRequest.setForce(true); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java index 2f575a8296e92..4017c70445311 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java @@ -22,36 +22,46 @@ public class SetResetModeActionRequest extends AcknowledgedRequest implements ToXContentObject { public static SetResetModeActionRequest enabled() { - return new SetResetModeActionRequest(true); + return new SetResetModeActionRequest(true, false); } - public static SetResetModeActionRequest disabled() { - return new SetResetModeActionRequest(false); + public static SetResetModeActionRequest disabled(boolean deleteMetadata) { + return new SetResetModeActionRequest(false, deleteMetadata); } private final boolean enabled; + private final boolean deleteMetadata; private static final ParseField ENABLED = new ParseField("enabled"); + private static final ParseField DELETE_METADATA = new ParseField("delete_metadata"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("set_reset_mode_action_request", a -> new SetResetModeActionRequest((Boolean)a[0])); + new ConstructingObjectParser<>("set_reset_mode_action_request", + a -> new SetResetModeActionRequest((Boolean)a[0], (Boolean)a[1])); static { PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DELETE_METADATA); } - SetResetModeActionRequest(boolean enabled) { + SetResetModeActionRequest(boolean enabled, Boolean deleteMetadata) { this.enabled = enabled; + this.deleteMetadata = deleteMetadata != null && deleteMetadata; } public SetResetModeActionRequest(StreamInput in) throws IOException { super(in); this.enabled = in.readBoolean(); + this.deleteMetadata = in.readBoolean(); } public boolean isEnabled() { return enabled; } + public boolean shouldDeleteMetadata() { + return deleteMetadata; + } + @Override public ActionRequestValidationException validate() { return null; @@ -61,11 +71,12 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(enabled); + out.writeBoolean(deleteMetadata); } @Override public int hashCode() { - return Objects.hash(enabled); + return Objects.hash(enabled, deleteMetadata); } @Override @@ -77,13 +88,17 @@ public boolean equals(Object obj) { return false; } SetResetModeActionRequest other = (SetResetModeActionRequest) obj; - return Objects.equals(enabled, other.enabled); + return Objects.equals(enabled, other.enabled) + && Objects.equals(deleteMetadata, other.deleteMetadata); } @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); builder.field(ENABLED.getPreferredName(), enabled); + if (enabled == false) { + builder.field(DELETE_METADATA.getPreferredName(), deleteMetadata); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 2d009291025bc..25fafcdf81d3a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -37,6 +37,9 @@ public final class MlTasks { public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment(null, "persistent task cannot be assigned while upgrade mode is enabled."); + public static final PersistentTasksCustomMetadata.Assignment RESET_IN_PROGRESS = + new PersistentTasksCustomMetadata.Assignment(null, + "persistent task will not be assigned as a feature reset is in progress."); private MlTasks() { } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java index e3cf1c5114d1b..c56c95dc33fb2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java @@ -14,7 +14,8 @@ public class SetResetModeActionRequestTests extends AbstractSerializingTestCase< @Override protected SetResetModeActionRequest createTestInstance() { - return new SetResetModeActionRequest(randomBoolean()); + boolean enabled = randomBoolean(); + return new SetResetModeActionRequest(enabled, enabled == false && randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java index bd69fed2febf5..e28b77c3006b8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java @@ -15,16 +15,13 @@ import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - public class MlRestTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; private final RestClient adminClient; @@ -34,15 +31,13 @@ public MlRestTestStateCleaner(Logger logger, RestClient adminClient) { } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); - // indices will be deleted by the ESRestTestCase class + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + adminClient.performRequest(new Request("POST", "/_features/_reset")); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { + private void deleteAllTrainedModelIngestPipelines() throws IOException { final Request getAllTrainedModelStats = new Request("GET", "/_ml/trained_models/_stats"); getAllTrainedModelStats.addParameter("size", "10000"); final Response trainedModelsStatsResponse = adminClient.performRequest(getAllTrainedModelStats); @@ -59,116 +54,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - final Request getTrainedModels = new Request("GET", "/_ml/trained_models"); - getTrainedModels.addParameter("size", "10000"); - final Response trainedModelsResponse = adminClient.performRequest(getTrainedModels); - final List> models = (List>) XContentMapValues.extractValue( - "trained_model_configs", - ESRestTestCase.entityAsMap(trainedModelsResponse) - ); - if (models == null || models.isEmpty()) { - return; - } - for (Map model : models) { - String modelId = (String) model.get("model_id"); - if (NOT_DELETED_TRAINED_MODELS.contains(modelId)) { - continue; - } - adminClient.performRequest(new Request("DELETE", "/_ml/trained_models/" + modelId)); - } - } - - @SuppressWarnings("unchecked") - private void deleteAllDatafeeds() throws IOException { - final Request datafeedsRequest = new Request("GET", "/_ml/datafeeds"); - datafeedsRequest.addParameter("filter_path", "datafeeds"); - final Response datafeedsResponse = adminClient.performRequest(datafeedsRequest); - final List> datafeeds = - (List>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse)); - if (datafeeds == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException( - "Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - - for (Map datafeed : datafeeds) { - String datafeedId = (String) datafeed.get("datafeed_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId)); - } - } - - private void deleteAllJobs() throws IOException { - final Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors"); - jobsRequest.addParameter("filter_path", "jobs"); - final Response response = adminClient.performRequest(jobsRequest); - @SuppressWarnings("unchecked") - final List> jobConfigs = - (List>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response)); - if (jobConfigs == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close")); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", - e1); - } - - for (Map jobConfig : jobConfigs) { - String jobId = (String) jobConfig.get("job_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId)); - } - } - - @SuppressWarnings({"unchecked"}) - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - final Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000"); - analyticsRequest.addParameter("filter_path", "data_frame_analytics"); - final Response analyticsResponse = adminClient.performRequest(analyticsRequest); - List> analytics = (List>) XContentMapValues.extractValue( - "data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse)); - if (analytics == null) { - return; - } - - for (Map config : analytics) { - String id = (String) config.get("id"); - adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id)); - } - } - - private void stopAllDataFrameAnalytics() { - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index ac6bc195e8ac5..9971fe8a31d6f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -6,6 +6,7 @@ dependencies { javaRestTestImplementation(testArtifact(project(xpackModule('ml')))) javaRestTestImplementation project(path: ':modules:ingest-common') javaRestTestImplementation project(path: xpackModule('data-streams')) + javaRestTestImplementation project(path: xpackModule('transform')) } // location for keys and certificates diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index d6850c691d447..a6e0ea61976fb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -130,6 +131,8 @@ protected Collection> nodePlugins() { MockPainlessScriptEngine.TestPlugin.class, // ILM is required for .ml-state template index settings IndexLifecycle.class, + // The feature reset API touches transform custom cluster state so we need this plugin to understand it + Transform.class, DataStreamsPlugin.class); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index 220370702772a..dfb7770ff4484 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -118,7 +118,7 @@ public void testNotCreatedWhenAfterOtherMlIndexAndResetInProgress() throws Excep assertEquals(0, numberOfAnnotationsAliases()); }); } finally { - client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled()).actionGet(); + client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true)).actionGet(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 04f391959e9fd..99d2e0eb0bb83 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1246,7 +1246,7 @@ public void cleanUpFeature( logger.info("Starting machine learning feature reset"); ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after state otherwise successful machine learning reset", resetFailure); @@ -1258,7 +1258,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error("failed to disable reset mode after state clean up failure", resetFailure); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java index e31afa81a7d8f..dc3294bad4f19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; @@ -51,22 +52,25 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode. * * Disallowed actions are the ones which can access/alter the state of ML internal indices. + * + * There is a complication that the feature reset API knows nothing about ML upgrade mode. If the feature + * reset API is called while ML upgrade mode is enabled then it takes precedence and resets the ML state. + * This means that all ML entities will be deleted and upgrade mode will be disabled if the reset completes + * successfully. */ class MlUpgradeModeActionFilter extends ActionFilter.Simple { private static final Set ACTIONS_DISALLOWED_IN_UPGRADE_MODE = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + Collections.unmodifiableSet(Sets.newHashSet( PutJobAction.NAME, UpdateJobAction.NAME, DeleteJobAction.NAME, @@ -113,18 +117,43 @@ class MlUpgradeModeActionFilter extends ActionFilter.Simple { PutTrainedModelAction.NAME, DeleteTrainedModelAction.NAME - ))); + )); + + private static final Set RESET_MODE_EXEMPTIONS = + Collections.unmodifiableSet(Sets.newHashSet( + DeleteJobAction.NAME, + CloseJobAction.NAME, + + DeleteDatafeedAction.NAME, + StopDatafeedAction.NAME, + + KillProcessAction.NAME, + + DeleteDataFrameAnalyticsAction.NAME, + StopDataFrameAnalyticsAction.NAME, + + DeleteTrainedModelAction.NAME + )); - private final AtomicBoolean isUpgradeMode = new AtomicBoolean(); + // At the time the action filter is installed no cluster state is available, so + // initialise to false/false and let the first change event set the real values + private final AtomicReference upgradeResetFlags = new AtomicReference<>(new UpgradeResetFlags(false, false)); MlUpgradeModeActionFilter(ClusterService clusterService) { Objects.requireNonNull(clusterService); - clusterService.addListener(this::setIsUpgradeMode); + clusterService.addListener(this::setUpgradeResetFlags); } @Override protected boolean apply(String action, ActionRequest request, ActionListener listener) { - if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { + // Ensure the same object is used for both tests + UpgradeResetFlags localUpgradeResetFlags = upgradeResetFlags.get(); + assert localUpgradeResetFlags != null; + // If we are in upgrade mode but a reset is being done then allow the destructive actions that reset mode uses + if (localUpgradeResetFlags.isResetMode && RESET_MODE_EXEMPTIONS.contains(action)) { + return true; + } + if (localUpgradeResetFlags.isUpgradeMode && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { throw new ElasticsearchStatusException( "Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action); } @@ -143,7 +172,23 @@ public int order() { } // Visible for testing - void setIsUpgradeMode(ClusterChangedEvent event) { - isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode()); + void setUpgradeResetFlags(ClusterChangedEvent event) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(event.state()); + upgradeResetFlags.set(new UpgradeResetFlags(mlMetadata.isUpgradeMode(), mlMetadata.isResetMode())); + } + + /** + * Class to allow both upgrade and reset flags to be recorded atomically so that code that checks both + * one after the other doesn't see inconsistent values. + */ + private static class UpgradeResetFlags { + + final boolean isUpgradeMode; + final boolean isResetMode; + + UpgradeResetFlags(boolean isUpgradeMode, boolean isResetMode) { + this.isUpgradeMode = isUpgradeMode; + this.isResetMode = isResetMode; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java index 1d97a9c997481..2f4c9826da444 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; +import org.elasticsearch.xpack.ml.inference.ModelAliasMetadata; public class TransportSetResetModeAction extends AbstractTransportSetResetModeAction { @@ -40,11 +41,21 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - MlMetadata.Builder builder = MlMetadata.Builder - .from(oldState.metadata().custom(MlMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(MlMetadata.TYPE) + .removeCustom(ModelAliasMetadata.NAME) + .build()); + } else { + MlMetadata.Builder builder = MlMetadata.Builder + .from(oldState.metadata().custom(MlMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(MlMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index dbb8267b6e4b4..d88dd3b5438e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -29,6 +29,7 @@ import java.util.Objects; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; public class DatafeedNodeSelector { @@ -78,6 +79,9 @@ public PersistentTasksCustomMetadata.Assignment selectNode() { if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return AWAITING_UPGRADE; } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return RESET_IN_PROGRESS; + } AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index ad0f51e654d4d..e657c56877a91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -37,6 +37,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_AUDIT_REQUIRES_MORE_MEMORY_TO_RUN; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; @@ -154,10 +155,13 @@ protected boolean allowsMissingIndices() { public Optional getPotentialAssignment(Params params, ClusterState clusterState, boolean isMemoryTrackerRecentlyRefreshed) { - // If we are waiting for an upgrade to complete, we should not assign to a node + // If we are waiting for an upgrade or reset to complete, we should not assign to a node if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return Optional.of(AWAITING_UPGRADE); } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return Optional.of(RESET_IN_PROGRESS); + } String jobId = getJobId(params); Optional missingIndices = checkRequiredIndices(jobId, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java index 26d3b1d1689b2..3b11c063f7ffb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter; @@ -42,6 +43,7 @@ public class MlUpgradeModeActionFilterTests extends ESTestCase { private static final String DISALLOWED_ACTION = PutJobAction.NAME; private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME; + private static final String DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION = CloseJobAction.NAME; private ClusterService clusterService; private Task task; @@ -65,37 +67,51 @@ public void assertNoMoreInteractions() { } public void testApply_ActionDisallowedInUpgradeMode() { + String action = randomFrom(DISALLOWED_ACTION, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION); MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.apply(task, action, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, - () -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain)); + () -> filter.apply(task, action, request, listener, chain)); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); + filter.apply(task, action, request, listener, chain); - assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled"))); + assertThat(e.getMessage(), is(equalTo("Cannot perform " + action + " action while upgrade mode is enabled"))); assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); - verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener); + verify(chain, times(2)).proceed(task, action, request, listener); } public void testApply_ActionAllowedInUpgradeMode() { MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener); } + public void testApply_ActionDisallowedInUpgradeModeWithResetModeExemption() { + MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + verify(chain, times(3)).proceed(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener); + } + public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() { MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService); SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, null, mock(ThreadPool.class), null, null); @@ -108,9 +124,10 @@ private static ClusterChangedEvent createClusterChangedEvent(ClusterState cluste return new ClusterChangedEvent("created-from-test", clusterState, clusterState); } - private static ClusterState createClusterState(boolean isUpgradeMode) { + private static ClusterState createClusterState(boolean isUpgradeMode, boolean isResetMode) { return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests")) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build())) + .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, + new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).isResetMode(isResetMode).build())) .build(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index a8f8880172c06..01d84f8b34db0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -466,6 +466,26 @@ public void testSelectNode_GivenMlUpgradeMode() { assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } + public void testSelectNode_GivenResetInProgress() { + Job job = createScheduledJob("job_id").build(new Date()); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); + assertThat(result, equalTo(MlTasks.RESET_IN_PROGRESS)); + } + public void testCheckDatafeedTaskCanBeCreated_GivenMlUpgradeMode() { Job job = createScheduledJob("job_id").build(new Date()); DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index ab03039dc8712..b042f14dc15b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -181,6 +182,23 @@ public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation()); } + public void testGetAssignment_GivenResetInProgress() { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + Metadata.Builder metadata = Metadata.builder(); + MlMetadata mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + csBuilder.metadata(metadata.putCustom(MlMetadata.TYPE, mlMetadata)); + + OpenJobPersistentTasksExecutor executor = createExecutor(Settings.EMPTY); + + Job job = mock(Job.class); + OpenJobAction.JobParams params = new OpenJobAction.JobParams("job_during_reset"); + params.setJob(job); + PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(params, csBuilder.build()); + assertNotNull(assignment); + assertNull(assignment.getExecutorNode()); + assertEquals(MlTasks.RESET_IN_PROGRESS.getExplanation(), assignment.getExplanation()); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetadata.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java index 3614790a2b2c3..209218cf42171 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java @@ -77,10 +77,9 @@ public void testTransformFeatureReset() throws Exception { Map metadata = (Map)ESRestTestCase.entityAsMap(response).get("metadata"); assertThat(metadata, is(not(nullValue()))); + // after a successful reset we completely remove the transform metadata Map transformMetadata = (Map)metadata.get("transform"); - assertThat(transformMetadata, is(not(nullValue()))); - assertThat(transformMetadata.get("reset_mode"), is(false)); - + assertThat(transformMetadata, is(nullValue())); // assert transforms are gone assertThat(getTransform("_all").getCount(), equalTo(0L)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 494e5b3ae74c7..5ac73188b97cb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -341,7 +341,7 @@ public void cleanUpFeature( ) { ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure); @@ -354,7 +354,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index bc20742d3f8d8..dd149e8917544 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import java.util.Collection; @@ -138,13 +139,17 @@ public static long getNumberOfTransformNodes(ClusterState clusterState) { /** * Check if cluster has at least 1 transform nodes and add a header warning if not. * To be used by transport actions only. + * Don't do this if a reset is in progress, because the feature reset API touches + * all features even if they have never been used. * * @param clusterState state */ public static void warnIfNoTransformNodes(ClusterState clusterState) { - long transformNodes = getNumberOfTransformNodes(clusterState); - if (transformNodes == 0) { - HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode() == false) { + long transformNodes = getNumberOfTransformNodes(clusterState); + if (transformNodes == 0) { + HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + } } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java index d48cfa217530f..0316ed9921555 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java @@ -40,12 +40,20 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - TransformMetadata.Builder builder = TransformMetadata.Builder - .from(oldState.metadata().custom(TransformMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()) - .putCustom(TransformMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(TransformMetadata.TYPE) + .build()); + } else { + TransformMetadata.Builder builder = TransformMetadata.Builder + .from(oldState.metadata().custom(TransformMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(TransformMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index beaa45c8f2b05..6076d5c7a520d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -91,6 +92,10 @@ public TransformPersistentTasksExecutor( @Override public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, ClusterState clusterState) { + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { + return new PersistentTasksCustomMetadata.Assignment(null, + "Transform task will not be assigned as a feature reset is in progress."); + } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { String reason = "Not starting transform ["