diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestResetFeatureStateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestResetFeatureStateAction.java index edeb6c0d96b77..a09db0686c515 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestResetFeatureStateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestResetFeatureStateAction.java @@ -8,8 +8,8 @@ package org.elasticsearch.rest.action.admin.cluster; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; -import org.elasticsearch.action.admin.cluster.snapshots.features.SnapshottableFeaturesAction; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -20,6 +20,10 @@ public class RestResetFeatureStateAction extends BaseRestHandler { + @Override public boolean allowSystemIndexAccessByDefault() { + return true; + } + @Override public List routes() { return List.of(new Route(RestRequest.Method.POST, "/_reset_feature_state")); @@ -35,7 +39,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli final ResetFeatureStateRequest req = new ResetFeatureStateRequest(); return restChannel -> { - client.execute(SnapshottableFeaturesAction.INSTANCE, req, new RestToXContentListener<>(restChannel)); + client.execute(ResetFeatureStateAction.INSTANCE, req, new RestToXContentListener<>(restChannel)); }; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 095af50fc9106..5f042551f61ba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -10,8 +10,10 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; @@ -361,8 +363,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -1227,6 +1231,62 @@ public String getFeatureDescription() { return "Provides anomaly detection and forecasting functionality"; } + @Override public void cleanUpFeature( + ClusterService clusterService, + Client client, + ActionListener listener) { + + Map results = new ConcurrentHashMap<>(); + + ActionListener afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> { + // Handle the response + results.put("data_frame/analytics", dataFrameStopResponse.isStopped()); + + if (results.values().stream().allMatch(b -> b)) { + // Call into the original listener to clean up the indices + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener); + } else { + final List failedComponents = results.entrySet().stream() + .filter(result -> result.getValue() == false) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + listener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); + } + }, listener::onFailure); + + + ActionListener afterAnomalyDetectionClosed = ActionListener.wrap(closeJobResponse -> { + // Handle the response + results.put("anomaly_detectors", closeJobResponse.isClosed()); + + // Stop data frame analytics + StopDataFrameAnalyticsAction.Request stopDataFramesReq = new StopDataFrameAnalyticsAction.Request("_all"); + stopDataFramesReq.setForce(true); + stopDataFramesReq.setAllowNoMatch(true); + client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq, afterDataframesStopped); + }, listener::onFailure); + + // Close anomaly detection jobs + ActionListener afterDataFeedsStopped = ActionListener.wrap(datafeedResponse -> { + // Handle the response + results.put("datafeeds", datafeedResponse.isStopped()); + + // Close anomaly detection jobs + CloseJobAction.Request closeJobsRequest = new CloseJobAction.Request(); + closeJobsRequest.setForce(true); + closeJobsRequest.setAllowNoMatch(true); + closeJobsRequest.setJobId("_all"); + client.execute(CloseJobAction.INSTANCE, closeJobsRequest, afterAnomalyDetectionClosed); + }, listener::onFailure); + + // Stop data feeds + StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all"); + stopDatafeedsReq.setAllowNoMatch(true); + stopDatafeedsReq.setForce(true); + client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq, + afterDataFeedsStopped); + } + @Override public BreakerSettings getCircuitBreaker(Settings settings) { return BreakerSettings.updateFromSettings(