diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java index e6ddcaef374d0..f7e42db2ee810 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MlTestStateCleaner.java @@ -11,59 +11,37 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.client.core.PageParams; -import org.elasticsearch.client.ml.CloseJobRequest; -import org.elasticsearch.client.ml.DeleteDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.DeleteDatafeedRequest; -import org.elasticsearch.client.ml.DeleteJobRequest; -import org.elasticsearch.client.ml.DeleteTrainedModelRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.GetDataFrameAnalyticsResponse; -import org.elasticsearch.client.ml.GetDatafeedRequest; -import org.elasticsearch.client.ml.GetDatafeedResponse; -import org.elasticsearch.client.ml.GetJobRequest; -import org.elasticsearch.client.ml.GetJobResponse; -import org.elasticsearch.client.ml.GetTrainedModelsRequest; +import org.elasticsearch.client.feature.ResetFeaturesRequest; import org.elasticsearch.client.ml.GetTrainedModelsStatsRequest; -import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; -import org.elasticsearch.client.ml.StopDatafeedRequest; -import org.elasticsearch.client.ml.datafeed.DatafeedConfig; -import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.client.ml.job.config.Job; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Cleans up and ML resources created during tests + * Cleans up ML resources created during tests */ public class MlTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; - private final MachineLearningClient mlClient; private final RestHighLevelClient client; public MlTestStateCleaner(Logger logger, RestHighLevelClient client) { this.logger = logger; - this.mlClient = client.machineLearning(); this.client = client; } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + client.features().resetFeatures(new ResetFeaturesRequest(), RequestOptions.DEFAULT); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { - Set pipelinesWithModels = mlClient.getTrainedModelsStats( + private void deleteAllTrainedModelIngestPipelines() throws IOException { + Set pipelinesWithModels = client.machineLearning().getTrainedModelsStats( new GetTrainedModelsStatsRequest("_all").setPageParams(new PageParams(0, 10_000)), RequestOptions.DEFAULT ).getTrainedModelStats() .stream() @@ -86,95 +64,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - mlClient.getTrainedModels( - GetTrainedModelsRequest.getAllTrainedModelConfigsRequest().setPageParams(new PageParams(0, 10_000)), - RequestOptions.DEFAULT) - .getTrainedModels() - .stream() - .filter(trainedModelConfig -> NOT_DELETED_TRAINED_MODELS.contains(trainedModelConfig.getModelId()) == false) - .forEach(config -> { - try { - mlClient.deleteTrainedModel(new DeleteTrainedModelRequest(config.getModelId()), RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - }); - } - - private void deleteAllDatafeeds() throws IOException { - stopAllDatafeeds(); - - GetDatafeedResponse getDatafeedResponse = mlClient.getDatafeed(GetDatafeedRequest.getAllDatafeedsRequest(), RequestOptions.DEFAULT); - for (DatafeedConfig datafeed : getDatafeedResponse.datafeeds()) { - mlClient.deleteDatafeed(new DeleteDatafeedRequest(datafeed.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDatafeeds() { - StopDatafeedRequest stopAllDatafeedsRequest = StopDatafeedRequest.stopAllDatafeedsRequest(); - try { - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - stopAllDatafeedsRequest.setForce(true); - mlClient.stopDatafeed(stopAllDatafeedsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - } - - private void deleteAllJobs() throws IOException { - closeAllJobs(); - - GetJobResponse getJobResponse = mlClient.getJob(GetJobRequest.getAllJobsRequest(), RequestOptions.DEFAULT); - for (Job job : getJobResponse.jobs()) { - mlClient.deleteJob(new DeleteJobRequest(job.getId()), RequestOptions.DEFAULT); - } - } - - private void closeAllJobs() { - CloseJobRequest closeAllJobsRequest = CloseJobRequest.closeAllJobsRequest(); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - closeAllJobsRequest.setForce(true); - try { - mlClient.closeJob(closeAllJobsRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", e1); - } - } - - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - GetDataFrameAnalyticsResponse getDataFrameAnalyticsResponse = - mlClient.getDataFrameAnalytics(GetDataFrameAnalyticsRequest.getAllDataFrameAnalyticsRequest(), RequestOptions.DEFAULT); - for (DataFrameAnalyticsConfig config : getDataFrameAnalyticsResponse.getAnalytics()) { - mlClient.deleteDataFrameAnalytics(new DeleteDataFrameAnalyticsRequest(config.getId()), RequestOptions.DEFAULT); - } - } - - private void stopAllDataFrameAnalytics() { - StopDataFrameAnalyticsRequest stopAllRequest = new StopDataFrameAnalyticsRequest("*"); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - stopAllRequest.setForce(true); - try { - mlClient.stopDataFrameAnalytics(stopAllRequest, RequestOptions.DEFAULT); - } catch (Exception e2) { - logger.warn("force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java index 2f575a8296e92..4017c70445311 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java @@ -22,36 +22,46 @@ public class SetResetModeActionRequest extends AcknowledgedRequest implements ToXContentObject { public static SetResetModeActionRequest enabled() { - return new SetResetModeActionRequest(true); + return new SetResetModeActionRequest(true, false); } - public static SetResetModeActionRequest disabled() { - return new SetResetModeActionRequest(false); + public static SetResetModeActionRequest disabled(boolean deleteMetadata) { + return new SetResetModeActionRequest(false, deleteMetadata); } private final boolean enabled; + private final boolean deleteMetadata; private static final ParseField ENABLED = new ParseField("enabled"); + private static final ParseField DELETE_METADATA = new ParseField("delete_metadata"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("set_reset_mode_action_request", a -> new SetResetModeActionRequest((Boolean)a[0])); + new ConstructingObjectParser<>("set_reset_mode_action_request", + a -> new SetResetModeActionRequest((Boolean)a[0], (Boolean)a[1])); static { PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DELETE_METADATA); } - SetResetModeActionRequest(boolean enabled) { + SetResetModeActionRequest(boolean enabled, Boolean deleteMetadata) { this.enabled = enabled; + this.deleteMetadata = deleteMetadata != null && deleteMetadata; } public SetResetModeActionRequest(StreamInput in) throws IOException { super(in); this.enabled = in.readBoolean(); + this.deleteMetadata = in.readBoolean(); } public boolean isEnabled() { return enabled; } + public boolean shouldDeleteMetadata() { + return deleteMetadata; + } + @Override public ActionRequestValidationException validate() { return null; @@ -61,11 +71,12 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(enabled); + out.writeBoolean(deleteMetadata); } @Override public int hashCode() { - return Objects.hash(enabled); + return Objects.hash(enabled, deleteMetadata); } @Override @@ -77,13 +88,17 @@ public boolean equals(Object obj) { return false; } SetResetModeActionRequest other = (SetResetModeActionRequest) obj; - return Objects.equals(enabled, other.enabled); + return Objects.equals(enabled, other.enabled) + && Objects.equals(deleteMetadata, other.deleteMetadata); } @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); builder.field(ENABLED.getPreferredName(), enabled); + if (enabled == false) { + builder.field(DELETE_METADATA.getPreferredName(), deleteMetadata); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 2d009291025bc..25fafcdf81d3a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -37,6 +37,9 @@ public final class MlTasks { public static final PersistentTasksCustomMetadata.Assignment AWAITING_UPGRADE = new PersistentTasksCustomMetadata.Assignment(null, "persistent task cannot be assigned while upgrade mode is enabled."); + public static final PersistentTasksCustomMetadata.Assignment RESET_IN_PROGRESS = + new PersistentTasksCustomMetadata.Assignment(null, + "persistent task will not be assigned as a feature reset is in progress."); private MlTasks() { } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java index e3cf1c5114d1b..c56c95dc33fb2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java @@ -14,7 +14,8 @@ public class SetResetModeActionRequestTests extends AbstractSerializingTestCase< @Override protected SetResetModeActionRequest createTestInstance() { - return new SetResetModeActionRequest(randomBoolean()); + boolean enabled = randomBoolean(); + return new SetResetModeActionRequest(enabled, enabled == false && randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java index bd69fed2febf5..e28b77c3006b8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java @@ -15,16 +15,13 @@ import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - public class MlRestTestStateCleaner { - private static final Set NOT_DELETED_TRAINED_MODELS = Collections.singleton("lang_ident_model_1"); private final Logger logger; private final RestClient adminClient; @@ -34,15 +31,13 @@ public MlRestTestStateCleaner(Logger logger, RestClient adminClient) { } public void clearMlMetadata() throws IOException { - deleteAllTrainedModels(); - deleteAllDatafeeds(); - deleteAllJobs(); - deleteAllDataFrameAnalytics(); - // indices will be deleted by the ESRestTestCase class + deleteAllTrainedModelIngestPipelines(); + // This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter + adminClient.performRequest(new Request("POST", "/_features/_reset")); } @SuppressWarnings("unchecked") - private void deleteAllTrainedModels() throws IOException { + private void deleteAllTrainedModelIngestPipelines() throws IOException { final Request getAllTrainedModelStats = new Request("GET", "/_ml/trained_models/_stats"); getAllTrainedModelStats.addParameter("size", "10000"); final Response trainedModelsStatsResponse = adminClient.performRequest(getAllTrainedModelStats); @@ -59,116 +54,5 @@ private void deleteAllTrainedModels() throws IOException { logger.warn(() -> new ParameterizedMessage("failed to delete pipeline [{}]", pipelineId), ex); } } - - final Request getTrainedModels = new Request("GET", "/_ml/trained_models"); - getTrainedModels.addParameter("size", "10000"); - final Response trainedModelsResponse = adminClient.performRequest(getTrainedModels); - final List> models = (List>) XContentMapValues.extractValue( - "trained_model_configs", - ESRestTestCase.entityAsMap(trainedModelsResponse) - ); - if (models == null || models.isEmpty()) { - return; - } - for (Map model : models) { - String modelId = (String) model.get("model_id"); - if (NOT_DELETED_TRAINED_MODELS.contains(modelId)) { - continue; - } - adminClient.performRequest(new Request("DELETE", "/_ml/trained_models/" + modelId)); - } - } - - @SuppressWarnings("unchecked") - private void deleteAllDatafeeds() throws IOException { - final Request datafeedsRequest = new Request("GET", "/_ml/datafeeds"); - datafeedsRequest.addParameter("filter_path", "datafeeds"); - final Response datafeedsResponse = adminClient.performRequest(datafeedsRequest); - final List> datafeeds = - (List>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse)); - if (datafeeds == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all datafeeds. Forcing stop", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all data feeds failed", e2); - } - throw new RuntimeException( - "Had to resort to force-stopping datafeeds, something went wrong?", e1); - } - - for (Map datafeed : datafeeds) { - String datafeedId = (String) datafeed.get("datafeed_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId)); - } - } - - private void deleteAllJobs() throws IOException { - final Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors"); - jobsRequest.addParameter("filter_path", "jobs"); - final Response response = adminClient.performRequest(jobsRequest); - @SuppressWarnings("unchecked") - final List> jobConfigs = - (List>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response)); - if (jobConfigs == null) { - return; - } - - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close")); - } catch (Exception e1) { - logger.warn("failed to close all jobs. Forcing closed", e1); - try { - adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close?force=true")); - } catch (Exception e2) { - logger.warn("Force-closing all jobs failed", e2); - } - throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", - e1); - } - - for (Map jobConfig : jobConfigs) { - String jobId = (String) jobConfig.get("job_id"); - adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId)); - } - } - - @SuppressWarnings({"unchecked"}) - private void deleteAllDataFrameAnalytics() throws IOException { - stopAllDataFrameAnalytics(); - - final Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000"); - analyticsRequest.addParameter("filter_path", "data_frame_analytics"); - final Response analyticsResponse = adminClient.performRequest(analyticsRequest); - List> analytics = (List>) XContentMapValues.extractValue( - "data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse)); - if (analytics == null) { - return; - } - - for (Map config : analytics) { - String id = (String) config.get("id"); - adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id)); - } - } - - private void stopAllDataFrameAnalytics() { - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop")); - } catch (Exception e1) { - logger.warn("failed to stop all data frame analytics. Will proceed to force-stopping", e1); - try { - adminClient.performRequest(new Request("POST", "_ml/data_frame/analytics/*/_stop?force=true")); - } catch (Exception e2) { - logger.warn("Force-stopping all data frame analytics failed", e2); - } - throw new RuntimeException("Had to resort to force-stopping data frame analytics, something went wrong?", e1); - } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index ac6bc195e8ac5..9971fe8a31d6f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -6,6 +6,7 @@ dependencies { javaRestTestImplementation(testArtifact(project(xpackModule('ml')))) javaRestTestImplementation project(path: ':modules:ingest-common') javaRestTestImplementation project(path: xpackModule('data-streams')) + javaRestTestImplementation project(path: xpackModule('transform')) } // location for keys and certificates diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index d6850c691d447..a6e0ea61976fb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -130,6 +131,8 @@ protected Collection> nodePlugins() { MockPainlessScriptEngine.TestPlugin.class, // ILM is required for .ml-state template index settings IndexLifecycle.class, + // The feature reset API touches transform custom cluster state so we need this plugin to understand it + Transform.class, DataStreamsPlugin.class); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index 220370702772a..dfb7770ff4484 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -118,7 +118,7 @@ public void testNotCreatedWhenAfterOtherMlIndexAndResetInProgress() throws Excep assertEquals(0, numberOfAnnotationsAliases()); }); } finally { - client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled()).actionGet(); + client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true)).actionGet(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 04f391959e9fd..99d2e0eb0bb83 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1246,7 +1246,7 @@ public void cleanUpFeature( logger.info("Starting machine learning feature reset"); ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after state otherwise successful machine learning reset", resetFailure); @@ -1258,7 +1258,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error("failed to disable reset mode after state clean up failure", resetFailure); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java index e31afa81a7d8f..dc3294bad4f19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; @@ -51,22 +52,25 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode. * * Disallowed actions are the ones which can access/alter the state of ML internal indices. + * + * There is a complication that the feature reset API knows nothing about ML upgrade mode. If the feature + * reset API is called while ML upgrade mode is enabled then it takes precedence and resets the ML state. + * This means that all ML entities will be deleted and upgrade mode will be disabled if the reset completes + * successfully. */ class MlUpgradeModeActionFilter extends ActionFilter.Simple { private static final Set ACTIONS_DISALLOWED_IN_UPGRADE_MODE = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + Collections.unmodifiableSet(Sets.newHashSet( PutJobAction.NAME, UpdateJobAction.NAME, DeleteJobAction.NAME, @@ -113,18 +117,43 @@ class MlUpgradeModeActionFilter extends ActionFilter.Simple { PutTrainedModelAction.NAME, DeleteTrainedModelAction.NAME - ))); + )); + + private static final Set RESET_MODE_EXEMPTIONS = + Collections.unmodifiableSet(Sets.newHashSet( + DeleteJobAction.NAME, + CloseJobAction.NAME, + + DeleteDatafeedAction.NAME, + StopDatafeedAction.NAME, + + KillProcessAction.NAME, + + DeleteDataFrameAnalyticsAction.NAME, + StopDataFrameAnalyticsAction.NAME, + + DeleteTrainedModelAction.NAME + )); - private final AtomicBoolean isUpgradeMode = new AtomicBoolean(); + // At the time the action filter is installed no cluster state is available, so + // initialise to false/false and let the first change event set the real values + private final AtomicReference upgradeResetFlags = new AtomicReference<>(new UpgradeResetFlags(false, false)); MlUpgradeModeActionFilter(ClusterService clusterService) { Objects.requireNonNull(clusterService); - clusterService.addListener(this::setIsUpgradeMode); + clusterService.addListener(this::setUpgradeResetFlags); } @Override protected boolean apply(String action, ActionRequest request, ActionListener listener) { - if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { + // Ensure the same object is used for both tests + UpgradeResetFlags localUpgradeResetFlags = upgradeResetFlags.get(); + assert localUpgradeResetFlags != null; + // If we are in upgrade mode but a reset is being done then allow the destructive actions that reset mode uses + if (localUpgradeResetFlags.isResetMode && RESET_MODE_EXEMPTIONS.contains(action)) { + return true; + } + if (localUpgradeResetFlags.isUpgradeMode && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { throw new ElasticsearchStatusException( "Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action); } @@ -143,7 +172,23 @@ public int order() { } // Visible for testing - void setIsUpgradeMode(ClusterChangedEvent event) { - isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode()); + void setUpgradeResetFlags(ClusterChangedEvent event) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(event.state()); + upgradeResetFlags.set(new UpgradeResetFlags(mlMetadata.isUpgradeMode(), mlMetadata.isResetMode())); + } + + /** + * Class to allow both upgrade and reset flags to be recorded atomically so that code that checks both + * one after the other doesn't see inconsistent values. + */ + private static class UpgradeResetFlags { + + final boolean isUpgradeMode; + final boolean isResetMode; + + UpgradeResetFlags(boolean isUpgradeMode, boolean isResetMode) { + this.isUpgradeMode = isUpgradeMode; + this.isResetMode = isResetMode; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java index 1d97a9c997481..2f4c9826da444 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; +import org.elasticsearch.xpack.ml.inference.ModelAliasMetadata; public class TransportSetResetModeAction extends AbstractTransportSetResetModeAction { @@ -40,11 +41,21 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - MlMetadata.Builder builder = MlMetadata.Builder - .from(oldState.metadata().custom(MlMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(MlMetadata.TYPE) + .removeCustom(ModelAliasMetadata.NAME) + .build()); + } else { + MlMetadata.Builder builder = MlMetadata.Builder + .from(oldState.metadata().custom(MlMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(MlMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index dbb8267b6e4b4..d88dd3b5438e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -29,6 +29,7 @@ import java.util.Objects; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; public class DatafeedNodeSelector { @@ -78,6 +79,9 @@ public PersistentTasksCustomMetadata.Assignment selectNode() { if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return AWAITING_UPGRADE; } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return RESET_IN_PROGRESS; + } AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index ad0f51e654d4d..e657c56877a91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -37,6 +37,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.core.ml.MlTasks.RESET_IN_PROGRESS; import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_AUDIT_REQUIRES_MORE_MEMORY_TO_RUN; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; @@ -154,10 +155,13 @@ protected boolean allowsMissingIndices() { public Optional getPotentialAssignment(Params params, ClusterState clusterState, boolean isMemoryTrackerRecentlyRefreshed) { - // If we are waiting for an upgrade to complete, we should not assign to a node + // If we are waiting for an upgrade or reset to complete, we should not assign to a node if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return Optional.of(AWAITING_UPGRADE); } + if (MlMetadata.getMlMetadata(clusterState).isResetMode()) { + return Optional.of(RESET_IN_PROGRESS); + } String jobId = getJobId(params); Optional missingIndices = checkRequiredIndices(jobId, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java index 26d3b1d1689b2..3b11c063f7ffb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter; @@ -42,6 +43,7 @@ public class MlUpgradeModeActionFilterTests extends ESTestCase { private static final String DISALLOWED_ACTION = PutJobAction.NAME; private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME; + private static final String DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION = CloseJobAction.NAME; private ClusterService clusterService; private Task task; @@ -65,37 +67,51 @@ public void assertNoMoreInteractions() { } public void testApply_ActionDisallowedInUpgradeMode() { + String action = randomFrom(DISALLOWED_ACTION, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION); MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.apply(task, action, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, - () -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain)); + () -> filter.apply(task, action, request, listener, chain)); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); - filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); + filter.apply(task, action, request, listener, chain); - assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled"))); + assertThat(e.getMessage(), is(equalTo("Cannot perform " + action + " action while upgrade mode is enabled"))); assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); - verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener); + verify(chain, times(2)).proceed(task, action, request, listener); } public void testApply_ActionAllowedInUpgradeMode() { MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); - filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, false))); filter.apply(task, ALLOWED_ACTION, request, listener, chain); verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener); } + public void testApply_ActionDisallowedInUpgradeModeWithResetModeExemption() { + MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(true, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + filter.setUpgradeResetFlags(createClusterChangedEvent(createClusterState(false, true))); + filter.apply(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener, chain); + + verify(chain, times(3)).proceed(task, DISALLOWED_ACTION_WITH_RESET_MODE_EXEMPTION, request, listener); + } + public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() { MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService); SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, null, mock(ThreadPool.class), null, null); @@ -108,9 +124,10 @@ private static ClusterChangedEvent createClusterChangedEvent(ClusterState cluste return new ClusterChangedEvent("created-from-test", clusterState, clusterState); } - private static ClusterState createClusterState(boolean isUpgradeMode) { + private static ClusterState createClusterState(boolean isUpgradeMode, boolean isResetMode) { return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests")) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build())) + .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, + new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).isResetMode(isResetMode).build())) .build(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index a8f8880172c06..01d84f8b34db0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -466,6 +466,26 @@ public void testSelectNode_GivenMlUpgradeMode() { assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } + public void testSelectNode_GivenResetInProgress() { + Job job = createScheduledJob("job_id").build(new Date()); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetadata.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); + assertThat(result, equalTo(MlTasks.RESET_IN_PROGRESS)); + } + public void testCheckDatafeedTaskCanBeCreated_GivenMlUpgradeMode() { Job job = createScheduledJob("job_id").build(new Date()); DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index ab03039dc8712..b042f14dc15b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -181,6 +182,23 @@ public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation()); } + public void testGetAssignment_GivenResetInProgress() { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + Metadata.Builder metadata = Metadata.builder(); + MlMetadata mlMetadata = new MlMetadata.Builder().isResetMode(true).build(); + csBuilder.metadata(metadata.putCustom(MlMetadata.TYPE, mlMetadata)); + + OpenJobPersistentTasksExecutor executor = createExecutor(Settings.EMPTY); + + Job job = mock(Job.class); + OpenJobAction.JobParams params = new OpenJobAction.JobParams("job_during_reset"); + params.setJob(job); + PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(params, csBuilder.build()); + assertNotNull(assignment); + assertNull(assignment.getExecutorNode()); + assertEquals(MlTasks.RESET_IN_PROGRESS.getExplanation(), assignment.getExplanation()); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetadata.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java index 3614790a2b2c3..209218cf42171 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java @@ -77,10 +77,9 @@ public void testTransformFeatureReset() throws Exception { Map metadata = (Map)ESRestTestCase.entityAsMap(response).get("metadata"); assertThat(metadata, is(not(nullValue()))); + // after a successful reset we completely remove the transform metadata Map transformMetadata = (Map)metadata.get("transform"); - assertThat(transformMetadata, is(not(nullValue()))); - assertThat(transformMetadata.get("reset_mode"), is(false)); - + assertThat(transformMetadata, is(nullValue())); // assert transforms are gone assertThat(getTransform("_all").getCount(), equalTo(0L)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 494e5b3ae74c7..5ac73188b97cb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -341,7 +341,7 @@ public void cleanUpFeature( ) { ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure); @@ -354,7 +354,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index bc20742d3f8d8..dd149e8917544 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import java.util.Collection; @@ -138,13 +139,17 @@ public static long getNumberOfTransformNodes(ClusterState clusterState) { /** * Check if cluster has at least 1 transform nodes and add a header warning if not. * To be used by transport actions only. + * Don't do this if a reset is in progress, because the feature reset API touches + * all features even if they have never been used. * * @param clusterState state */ public static void warnIfNoTransformNodes(ClusterState clusterState) { - long transformNodes = getNumberOfTransformNodes(clusterState); - if (transformNodes == 0) { - HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode() == false) { + long transformNodes = getNumberOfTransformNodes(clusterState); + if (transformNodes == 0) { + HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + } } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java index d48cfa217530f..0316ed9921555 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java @@ -40,12 +40,20 @@ protected String featureName() { @Override protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { - TransformMetadata.Builder builder = TransformMetadata.Builder - .from(oldState.metadata().custom(TransformMetadata.TYPE)) - .isResetMode(request.isEnabled()); ClusterState.Builder newState = ClusterState.builder(oldState); - newState.metadata(Metadata.builder(oldState.getMetadata()) - .putCustom(TransformMetadata.TYPE, builder.build()).build()); + if (request.shouldDeleteMetadata()) { + assert request.isEnabled() == false; // SetResetModeActionRequest should have enforced this + newState.metadata(Metadata.builder(oldState.getMetadata()) + .removeCustom(TransformMetadata.TYPE) + .build()); + } else { + TransformMetadata.Builder builder = TransformMetadata.Builder + .from(oldState.metadata().custom(TransformMetadata.TYPE)) + .isResetMode(request.isEnabled()); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(TransformMetadata.TYPE, builder.build()) + .build()); + } return newState.build(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index beaa45c8f2b05..6076d5c7a520d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -91,6 +92,10 @@ public TransformPersistentTasksExecutor( @Override public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, ClusterState clusterState) { + if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { + return new PersistentTasksCustomMetadata.Assignment(null, + "Transform task will not be assigned as a feature reset is in progress."); + } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { String reason = "Not starting transform ["