From 8e5b79e48672ec6cbfeec77dddd2165aa69d9d87 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 15 Apr 2021 16:29:34 +0100 Subject: [PATCH] [ML] Use feature reset API in ML REST test cleanup (#71746) Now that we have a feature reset API, we should use this for cleaning up in between tests instead of running lots of bespoke cleanup code. During testing of this change we found we need to delete custom cluster state as part of the reset process, so this PR also implements that. Additionally we no longer assign persistent tasks during feature reset. Backport of #71552 --- .../client/MlTestStateCleaner.java | 126 +----------------- .../action/SetResetModeActionRequest.java | 29 +++- .../elasticsearch/xpack/core/ml/MlTasks.java | 3 + .../SetResetModeActionRequestTests.java | 3 +- .../integration/MlRestTestStateCleaner.java | 124 +---------------- .../qa/native-multi-node-tests/build.gradle | 1 + .../ml/integration/MlNativeIntegTestCase.java | 3 + .../ml/integration/AnnotationIndexIT.java | 2 +- .../xpack/ml/MachineLearning.java | 4 +- .../xpack/ml/MlUpgradeModeActionFilter.java | 65 +++++++-- .../action/TransportSetResetModeAction.java | 19 ++- .../ml/datafeed/DatafeedNodeSelector.java | 4 + .../AbstractJobPersistentTasksExecutor.java | 6 +- .../ml/MlUpgradeModeActionFilterTests.java | 39 ++++-- .../datafeed/DatafeedNodeSelectorTests.java | 20 +++ .../OpenJobPersistentTasksExecutorTests.java | 18 +++ .../integration/TestFeatureResetIT.java | 5 +- .../xpack/transform/Transform.java | 4 +- .../transform/action/TransformNodes.java | 11 +- .../TransportSetTransformResetModeAction.java | 18 ++- .../TransformPersistentTasksExecutor.java | 5 + 21 files changed, 220 insertions(+), 289 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java index e6ddcaef374d0..f7e42db2ee810 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java @@ -11,59 +11,37 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.client.core.PageParams; -import org.elasticsearch.client.ml.CloseJobRequest; -import org.elasticsearch.client.ml.DeleteDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.DeleteDatafeedRequest; -import org.elasticsearch.client.ml.DeleteJobRequest; -import org.elasticsearch.client.ml.DeleteTrainedModelRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsResponse; -import org.elasticsearch.client.ml.GetDatafeedRequest; -import org.elasticsearch.client.ml.GetDatafeedResponse; -import org.elasticsearch.client.ml.GetJobRequest; -import org.elasticsearch.client.ml.GetJobResponse; -import org.elasticsearch.client.ml.GetTrainedModelsRequest; +import org.elasticsearch.client.feature.ResetFeaturesRequest; import org.elasticsearch.client.ml.GetTrainedModelsStatsRequest; -import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.StopDatafeedRequest; -import org.elasticsearch.client.ml.datafeed.DatafeedConfig; -import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.client.ml.job.config.Job; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Cleans up and ML resources created during tests + * Cleans up ML resources created during tests */ public class MlTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; - private final MachineLearningClient mlClient; private final RestHighLevelClient client; public MlTestStateCleaner(Logger logger, RestHighLevelClient client) { this.logger = logger; - this.mlClient = client.machineLearning(); this.client = client; } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + client.features().resetFeatures(new ResetFeaturesRequest(), RequestOptions.DEFAULT); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { - Set pipelinesWithModels = mlClient.getTrainedModelsStats( + private void deleteAllTrainedModelIngestPipelines() throws IOException { + Set pipelinesWithModels = client.machineLearning().getTrainedModelsStats( new GetTrainedModelsStatsRequest("_all").setPageParams(new PageParams(0, 10_000)), RequestOptions.DEFAULT ).getTrainedModelStats() .stream() @@ -86,95 +64,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - mlClient.getTrainedModels( - GetTrainedModelsRequest.getAllTrainedModelConfigsRequest().setPageParams(new PageParams(0, 10_000)), - RequestOptions.DEFAULT) - .getTrainedModels() - .stream() - .filter(trainedModelConfig -> NOT_DELETED_TRAINED_MODELS.contains(trainedModelConfig.getModelId()) == false) - .forEach(config -> { - try { - mlClient.deleteTrainedModel(new DeleteTrainedModelRequest(config.getModelId()), RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - }); - } - - private void deleteAllDatafeeds() throws IOException { - stopAllDatafeeds(); - - GetDatafeedResponse getDatafeedResponse = mlClient.getDatafeed(GetDatafeedRequest.getAllDatafeedsRequest(), RequestOptions.DEFAULT); - for (DatafeedConfig datafeed : getDatafeedResponse.datafeeds()) { - mlClient.deleteDatafeed(new DeleteDatafeedRequest(datafeed.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDatafeeds() { - StopDatafeedRequest stopAllDatafeedsRequest = StopDatafeedRequest.stopAllDatafeedsRequest(); - try { - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - stopAllDatafeedsRequest.setForce(true); - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - } - - private void deleteAllJobs() throws IOException { - closeAllJobs(); - - GetJobResponse getJobResponse = mlClient.getJob(GetJobRequest.getAllJobsRequest(), RequestOptions.DEFAULT); - for (Job job : getJobResponse.jobs()) { - mlClient.deleteJob(new DeleteJobRequest(job.getId()), RequestOptions.DEFAULT); - } - } - - private void closeAllJobs() { - CloseJobRequest closeAllJobsRequest = CloseJobRequest.closeAllJobsRequest(); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - closeAllJobsRequest.setForce(true); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", e1); - } - } - - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - GetDataFrameAnalyticsResponse getDataFrameAnalyticsResponse = - mlClient.getDataFrameAnalytics(GetDataFrameAnalyticsRequest.getAllDataFrameAnalyticsRequest(), RequestOptions.DEFAULT); - for (DataFrameAnalyticsConfig config : getDataFrameAnalyticsResponse.getAnalytics()) { - mlClient.deleteDataFrameAnalytics(new DeleteDataFrameAnalyticsRequest(config.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDataFrameAnalytics() { - StopDataFrameAnalyticsRequest stopAllRequest = new StopDataFrameAnalyticsRequest("*"); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - stopAllRequest.setForce(true); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java index 2f575a8296e92..4017c70445311 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java @@ -22,36 +22,46 @@ public class SetResetModeActionRequest extends AcknowledgedRequest implements ToXContentObject { public static SetResetModeActionRequest enabled() { - return new SetResetModeActionRequest(true); + return new SetResetModeActionRequest(true, false); } - public static SetResetModeActionRequest disabled() { - return new SetResetModeActionRequest(false); + public static SetResetModeActionRequest disabled(boolean deleteMetadata) { + return new SetResetModeActionRequest(false, deleteMetadata); } private final boolean enabled; + private final boolean deleteMetadata; private static final ParseField ENABLED = new ParseField("enabled"); + private static final ParseField DELETE_METADATA = new ParseField("delete_metadata"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("set_reset_mode_action_request", a -> new SetResetModeActionRequest((Boolean)a[0])); + new ConstructingObjectParser<>("set_reset_mode_action_request", + a -> new SetResetModeActionRequest((Boolean)a[0], (Boolean)a[1])); static { PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DELETE_METADATA); } - SetResetModeActionRequest(boolean enabled) { + SetResetModeActionRequest(boolean enabled, Boolean deleteMetadata) { this.enabled = enabled; + this.deleteMetadata = deleteMetadata != null && deleteMetadata; } public SetResetModeActionRequest(StreamInput in) throws IOException { super(in); this.enabled = in.readBoolean(); + this.deleteMetadata = in.readBoolean(); } public boolean isEnabled() { return enabled; } + public boolean shouldDeleteMetadata() { + return deleteMetadata; + } + @Override public ActionRequestValidationException validate() { return null; @@ -61,11 +71,12 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(enabled); + out.writeBoolean(deleteMetadata); } @Override public int hashCode() { - return Objects.hash(enabled); + return Objects.hash(enabled, deleteMetadata); } @Override @@ -77,13 +88,17 @@ public boolean equals(Object obj) { return false; } SetResetModeActionRequest other = (SetResetModeActionRequest) obj; - return Objects.equals(enabled, other.enabled); + return Objects.equals(enabled, other.enabled) + && Objects.equals(deleteMetadata, other.deleteMetadata); } @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); builder.field(ENABLED.getPreferredName(), enabled); + if (enabled == false) { + builder.field(DELETE_METADATA.getPreferredName(), deleteMetadata); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 2d009291025bc..25fafcdf81d3a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -37,6 +37,9 @@ public final class MlTasks { public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment(null, "persistent task cannot be assigned while upgrade mode is enabled."); + public static final PersistentTasksCustomMetadata.Assignment RESET_IN_PROGRESS = + new PersistentTasksCustomMetadata.Assignment(null, + "persistent task will not be assigned as a feature reset is in progress."); private MlTasks() { } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java index e3cf1c5114d1b..c56c95dc33fb2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java @@ -14,7 +14,8 @@ public class SetResetModeActionRequestTests extends AbstractSerializingTestCase< @Override protected SetResetModeActionRequest createTestInstance() { - return new SetResetModeActionRequest(randomBoolean()); + boolean enabled = randomBoolean(); + return new SetResetModeActionRequest(enabled, enabled == false && randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java index bd69fed2febf5..e28b77c3006b8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java @@ -15,16 +15,13 @@ import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - public class MlRestTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; private final RestClient adminClient; @@ -34,15 +31,13 @@ public MlRestTestStateCleaner(Logger logger, RestClient adminClient) { } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); - // indices will be deleted by the ESRestTestCase class + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + adminClient.performRequest(new Request("POST", "/_features/_reset")); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { + private void deleteAllTrainedModelIngestPipelines() throws IOException { final Request getAllTrainedModelStats = new Request("GET", "/_ml/trained_models/_stats"); getAllTrainedModelStats.addParameter("size", "10000"); final Response trainedModelsStatsResponse = adminClient.performRequest(getAllTrainedModelStats); @@ -59,116 +54,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - final Request getTrainedModels = new Request("GET", "/_ml/trained_models"); - getTrainedModels.addParameter("size", "10000"); - final Response trainedModelsResponse = adminClient.performRequest(getTrainedModels); - final List> models = (List>) XContentMapValues.extractValue( - "trained_model_configs", - ESRestTestCase.entityAsMap(trainedModelsResponse) - ); - if (models == null || models.isEmpty()) { - return; - } - for (Map model : models) { - String modelId = (String) model.get("model_id"); - if (NOT_DELETED_TRAINED_MODELS.contains(modelId)) { - continue; - } - adminClient.performRequest(new Request("DELETE", "/_ml/trained_models/" + modelId)); - } - } - - @SuppressWarnings("unchecked") - private void deleteAllDatafeeds() throws IOException { - final Request datafeedsRequest = new Request("GET", "/_ml/datafeeds"); - datafeedsRequest.addParameter("filter_path", "datafeeds"); - final Response datafeedsResponse = adminClient.performRequest(datafeedsRequest); - final List> datafeeds = - (List>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse)); - if (datafeeds == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException( - "Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - - for (Map datafeed : datafeeds) { - String datafeedId = (String) datafeed.get("datafeed_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId)); - } - } - - private void deleteAllJobs() throws IOException { - final Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors"); - jobsRequest.addParameter("filter_path", "jobs"); - final Response response = adminClient.performRequest(jobsRequest); - @SuppressWarnings("unchecked") - final List> jobConfigs = - (List>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response)); - if (jobConfigs == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close")); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", - e1); - } - - for (Map jobConfig : jobConfigs) { - String jobId = (String) jobConfig.get("job_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId)); - } - } - - @SuppressWarnings({"unchecked"}) - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - final Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000"); - analyticsRequest.addParameter("filter_path", "data_frame_analytics"); - final Response analyticsResponse = adminClient.performRequest(analyticsRequest); - List> analytics = (List>) XContentMapValues.extractValue( - "data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse)); - if (analytics == null) { - return; - } - - for (Map config : analytics) { - String id = (String) config.get("id"); - adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id)); - } - } - - private void stopAllDataFrameAnalytics() { - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index c6356945b6f21..a5d490919fa74 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -6,6 +6,7 @@ dependencies { javaRestTestImplementation(testArtifact(project(xpackModule('ml')))) javaRestTestImplementation project(path: ':modules:ingest-common') javaRestTestImplementation project(path: xpackModule('data-streams')) + javaRestTestImplementation project(path: xpackModule('transform')) } // location for keys and certificates diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 13badf56f07c6..e280aebe42fb7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -125,6 +126,8 @@ protected Collection> nodePlugins() { MockPainlessScriptEngine.TestPlugin.class, // ILM is required for .ml-state template index settings IndexLifecycle.class, + // The feature reset API touches transform custom cluster state so we need this plugin to understand it + Transform.class, DataStreamsPlugin.class); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index fc7da291f45b3..2e100e0c29d93 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -117,7 +117,7 @@ public void testNotCreatedWhenAfterOtherMlIndexAndResetInProgress() throws Excep assertEquals(0, numberOfAnnotationsAliases()); }); } finally { - client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled()).actionGet(); + client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true)).actionGet(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f894f5285f6a2..45e6175c8b3ea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1214,7 +1214,7 @@ public void cleanUpFeature( logger.info("Starting machine learning feature reset"); ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after state otherwise successful machine learning reset", resetFailure); @@ -1226,7 +1226,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error("failed to disable reset mode after state clean up failure", resetFailure); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java index a98d3867220f4..94106a800e2dd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; @@ -55,22 +56,25 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode. * * Disallowed actions are the ones which can access/alter the state of ML internal indices. + * + * There is a complication that the feature reset API knows nothing about ML upgrade mode. If the feature + * reset API is called while ML upgrade mode is enabled then it takes precedence and resets the ML state. + * This means that all ML entities will be deleted and upgrade mode will be disabled if the reset completes + * successfully. */ class MlUpgradeModeActionFilter extends ActionFilter.Simple { private static final Set ACTIONS_DISALLOWED_IN_UPGRADE_MODE = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + Collections.unmodifiableSet(Sets.newHashSet( PutJobAction.NAME, UpdateJobAction.NAME, DeleteJobAction.NAME, @@ -121,18 +125,43 @@ class MlUpgradeModeActionFilter extends ActionFilter.Simple { PutTrainedModelAction.NAME, DeleteTrainedModelAction.NAME, DeleteTrainedModelAliasAction.NAME - ))); + )); + + private static final Set RESET_MODE_EXEMPTIONS = + Collections.unmodifiableSet(Sets.newHashSet( + DeleteJobAction.NAME, + CloseJobAction.NAME, - private final AtomicBoolean isUpgradeMode = new AtomicBoolean(); + DeleteDatafeedAction.NAME, + StopDatafeedAction.NAME, + + KillProcessAction.NAME, + + DeleteDataFrameAnalyticsAction.NAME, + StopDataFrameAnalyticsAction.NAME, + + DeleteTrainedModelAction.NAME + )); + + // At the time the action filter is installed no cluster state is available, so + // initialise to false/false and let the first change event set the real values + private final AtomicReference upgradeResetFlags = new AtomicReference<>(new UpgradeResetFlags(false, false)); MlUpgradeModeActionFilter(ClusterService clusterService) { Objects.requireNonNull(clusterService); - clusterService.addListener(this::setIsUpgradeMode); + clusterService.addListener(this::setUpgradeResetFlags); } @Override protected boolean apply(String action, ActionRequest request, ActionListener listener) { - if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { + // Ensure the same object is used for both tests + UpgradeResetFlags localUpgradeResetFlags = upgradeResetFlags.get(); + assert localUpgradeResetFlags != null; + // If we are in upgrade mode but a reset is being done then allow the destructive actions that reset mode uses + if (localUpgradeResetFlags.isResetMode && RESET_MODE_EXEMPTIONS.contains(action)) { + return true; + } + if (localUpgradeResetFlags.isUpgradeMode && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { throw new ElasticsearchStatusException( "Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action); } @@ -151,7 +180,23 @@ public int order() { } // Visible for testing - void setIsUpgradeMode(ClusterChangedEvent event) { - isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode()); + void setUpgradeResetFlags(ClusterChangedEvent event) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(event.state()); + upgradeResetFlags.set(new UpgradeResetFlags(mlMetadata.isUpgradeMode(), mlMetadata.isResetMode())); + } + + /** + * Class to allow both upgrade and reset flags to be recorded atomically so that code that checks both + * one after the other doesn't see inconsistent values. + */ + private static class UpgradeResetFlags { + + final boolean isUpgradeMode; + final boolean isResetMode; + + UpgradeResetFlags(boolean isUpgradeMode, boolean isResetMode) { + this.isUpgradeMode = isUpgradeMode; + this.isResetMode = isResetMode; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java index 1d97a9c997481..86b1af395db51 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; +import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata; public class TransportSetResetModeAction extends AbstractTransportSetResetModeAction { @@ -40,11 +41,21 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - MlMetadata.Builder builder = MlMetadata.Builder - .from(oldState.metadata().custom(MlMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(MlMetadata.TYPE) + .removeCustom(ModelAliasMetadata.NAME) + .build()); + } else { + MlMetadata.Builder builder = MlMetadata.Builder + .from(oldState.metadata().custom(MlMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(MlMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index dbb8267b6e4b4..d88dd3b5438e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -29,6 +29,7 @@ import java.util.Objects; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; public class DatafeedNodeSelector { @@ -78,6 +79,9 @@ public PersistentTasksCustomMetadata.Assignment selectNode() { if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return AWAITING_UPGRADE; } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return RESET_IN_PROGRESS; + } AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index ad0f51e654d4d..e657c56877a91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -37,6 +37,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_AUDIT_REQUIRES_MORE_MEMORY_TO_RUN; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; @@ -154,10 +155,13 @@ protected boolean allowsMissingIndices() { public Optional getPotentialAssignment(Params params, ClusterState clusterState, boolean isMemoryTrackerRecentlyRefreshed) { - // If we are waiting for an upgrade to complete, we should not assign to a node + // If we are waiting for an upgrade or reset to complete, we should not assign to a node if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return Optional.of(AWAITING_UPGRADE); } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return Optional.of(RESET_IN_PROGRESS); + } String jobId = getJobId(params); Optional missingIndices = checkRequiredIndices(jobId, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java index 26d3b1d1689b2..3b11c063f7ffb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter; @@ -42,6 +43,7 @@ public class MlUpgradeModeActionFilterTests extends ESTestCase { private static final String DISALLOWED_ACTION = PutJobAction.NAME; private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME; + private static final String DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION = CloseJobAction.NAME; private ClusterService clusterService; private Task task; @@ -65,37 +67,51 @@ public void assertNoMoreInteractions() { } public void testApply_ActionDisallowedInUpgradeMode() { + String action = randomFrom(DISALLOWED_ACTION, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION); MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.apply(task, action, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, - () -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain)); + () -> filter.apply(task, action, request, listener, chain)); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); + filter.apply(task, action, request, listener, chain); - assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled"))); + assertThat(e.getMessage(), is(equalTo("Cannot perform " + action + " action while upgrade mode is enabled"))); assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); - verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener); + verify(chain, times(2)).proceed(task, action, request, listener); } public void testApply_ActionAllowedInUpgradeMode() { MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener); } + public void testApply_ActionDisallowedInUpgradeModeWithResetModeExemption() { + MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + verify(chain, times(3)).proceed(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener); + } + public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() { MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService); SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, null, mock(ThreadPool.class), null, null); @@ -108,9 +124,10 @@ private static ClusterChangedEvent createClusterChangedEvent(ClusterState cluste return new ClusterChangedEvent("created-from-test", clusterState, clusterState); } - private static ClusterState createClusterState(boolean isUpgradeMode) { + private static ClusterState createClusterState(boolean isUpgradeMode, boolean isResetMode) { return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests")) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build())) + .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, + new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).isResetMode(isResetMode).build())) .build(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index a8f8880172c06..01d84f8b34db0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -466,6 +466,26 @@ public void testSelectNode_GivenMlUpgradeMode() { assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } + public void testSelectNode_GivenResetInProgress() { + Job job = createScheduledJob("job_id").build(new Date()); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); + assertThat(result, equalTo(MlTasks.RESET_IN_PROGRESS)); + } + public void testCheckDatafeedTaskCanBeCreated_GivenMlUpgradeMode() { Job job = createScheduledJob("job_id").build(new Date()); DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index 54f9b6cfde13d..9dafba8d98713 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -187,6 +188,23 @@ public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation()); } + public void testGetAssignment_GivenResetInProgress() { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + Metadata.Builder metadata = Metadata.builder(); + MlMetadata mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + csBuilder.metadata(metadata.putCustom(MlMetadata.TYPE, mlMetadata)); + + OpenJobPersistentTasksExecutor executor = createExecutor(Settings.EMPTY); + + Job job = mock(Job.class); + OpenJobAction.JobParams params = new OpenJobAction.JobParams("job_during_reset"); + params.setJob(job); + PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(params, csBuilder.build()); + assertNotNull(assignment); + assertNull(assignment.getExecutorNode()); + assertEquals(MlTasks.RESET_IN_PROGRESS.getExplanation(), assignment.getExplanation()); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetadata.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java index 3614790a2b2c3..209218cf42171 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java @@ -77,10 +77,9 @@ public void testTransformFeatureReset() throws Exception { Map metadata = (Map)ESRestTestCase.entityAsMap(response).get("metadata"); assertThat(metadata, is(not(nullValue()))); + // after a successful reset we completely remove the transform metadata Map transformMetadata = (Map)metadata.get("transform"); - assertThat(transformMetadata, is(not(nullValue()))); - assertThat(transformMetadata.get("reset_mode"), is(false)); - + assertThat(transformMetadata, is(nullValue())); // assert transforms are gone assertThat(getTransform("_all").getCount(), equalTo(0L)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 694bd6f87d3a1..985bfa14712d7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -433,7 +433,7 @@ public void cleanUpFeature( ) { ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure); @@ -446,7 +446,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index 853b7aaf71161..6fb079b8479ac 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -16,6 +16,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.transform.Transform; @@ -138,13 +139,17 @@ public static long getNumberOfTransformNodes(ClusterState clusterState) { /** * Check if cluster has at least 1 transform nodes and add a header warning if not. * To be used by transport actions only. + * Don't do this if a reset is in progress, because the feature reset API touches + * all features even if they have never been used. * * @param clusterState state */ public static void warnIfNoTransformNodes(ClusterState clusterState) { - long transformNodes = getNumberOfTransformNodes(clusterState); - if (transformNodes == 0) { - HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode() == false) { + long transformNodes = getNumberOfTransformNodes(clusterState); + if (transformNodes == 0) { + HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + } } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java index d48cfa217530f..0316ed9921555 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java @@ -40,12 +40,20 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - TransformMetadata.Builder builder = TransformMetadata.Builder - .from(oldState.metadata().custom(TransformMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()) - .putCustom(TransformMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(TransformMetadata.TYPE) + .build()); + } else { + TransformMetadata.Builder builder = TransformMetadata.Builder + .from(oldState.metadata().custom(TransformMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(TransformMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 5840078f6e107..ab6151f04a2de 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -91,6 +92,10 @@ public TransformPersistentTasksExecutor( @Override public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, ClusterState clusterState) { + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { + return new PersistentTasksCustomMetadata.Assignment(null, + "Transform task will not be assigned as a feature reset is in progress."); + } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { String reason = "Not starting transform ["