Skip to content

Commit

Permalink
ML feature state cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
gwbrown committed Feb 24, 2021
1 parent 02a5937 commit 83817e4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.rest.action.admin.cluster;

import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest;
import org.elasticsearch.action.admin.cluster.snapshots.features.SnapshottableFeaturesAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -20,6 +20,10 @@

public class RestResetFeatureStateAction extends BaseRestHandler {

@Override public boolean allowSystemIndexAccessByDefault() {
return true;
}

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.POST, "/_reset_feature_state"));
Expand All @@ -35,7 +39,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
final ResetFeatureStateRequest req = new ResetFeatureStateRequest();

return restChannel -> {
client.execute(SnapshottableFeaturesAction.INSTANCE, req, new RestToXContentListener<>(restChannel));
client.execute(ResetFeatureStateAction.INSTANCE, req, new RestToXContentListener<>(restChannel));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
Expand Down Expand Up @@ -361,8 +363,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -1227,6 +1231,62 @@ public String getFeatureDescription() {
return "Provides anomaly detection and forecasting functionality";
}

@Override public void cleanUpFeature(
ClusterService clusterService,
Client client,
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> listener) {

Map<String, Boolean> results = new ConcurrentHashMap<>();

ActionListener<StopDataFrameAnalyticsAction.Response> afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> {
// Handle the response
results.put("data_frame/analytics", dataFrameStopResponse.isStopped());

if (results.values().stream().allMatch(b -> b)) {
// Call into the original listener to clean up the indices
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener);
} else {
final List<String> failedComponents = results.entrySet().stream()
.filter(result -> result.getValue() == false)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
listener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents));
}
}, listener::onFailure);


ActionListener<CloseJobAction.Response> afterAnomalyDetectionClosed = ActionListener.wrap(closeJobResponse -> {
// Handle the response
results.put("anomaly_detectors", closeJobResponse.isClosed());

// Stop data frame analytics
StopDataFrameAnalyticsAction.Request stopDataFramesReq = new StopDataFrameAnalyticsAction.Request("_all");
stopDataFramesReq.setForce(true);
stopDataFramesReq.setAllowNoMatch(true);
client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq, afterDataframesStopped);
}, listener::onFailure);

// Close anomaly detection jobs
ActionListener<StopDatafeedAction.Response> afterDataFeedsStopped = ActionListener.wrap(datafeedResponse -> {
// Handle the response
results.put("datafeeds", datafeedResponse.isStopped());

// Close anomaly detection jobs
CloseJobAction.Request closeJobsRequest = new CloseJobAction.Request();
closeJobsRequest.setForce(true);
closeJobsRequest.setAllowNoMatch(true);
closeJobsRequest.setJobId("_all");
client.execute(CloseJobAction.INSTANCE, closeJobsRequest, afterAnomalyDetectionClosed);
}, listener::onFailure);

// Stop data feeds
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all");
stopDatafeedsReq.setAllowNoMatch(true);
stopDatafeedsReq.setForce(true);
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq,
afterDataFeedsStopped);
}

@Override
public BreakerSettings getCircuitBreaker(Settings settings) {
return BreakerSettings.updateFromSettings(
Expand Down

0 comments on commit 83817e4

Please sign in to comment.