Skip to content

Commit

Permalink
[ML] allow unran/incomplete forecasts to be deleted for stopped/faile…
Browse files Browse the repository at this point in the history
…d jobs (#57152) (#57172)

If a job is NOT opened, forecasts should be able to be deleted, no matter their state.

This also fixes a bug with expanding forecast IDs. We should check for wildcard `*` and `_all` when expanding the ids

closes #56419
  • Loading branch information
benwtrent authored May 26, 2020
1 parent 0fce2b7 commit decc627
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -36,13 +37,16 @@
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
Expand All @@ -55,6 +59,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand All @@ -71,21 +76,28 @@ public class TransportDeleteForecastAction extends HandledTransportAction<Delete
private static final Logger logger = LogManager.getLogger(TransportDeleteForecastAction.class);

private final Client client;
private final ClusterService clusterService;
private static final int MAX_FORECAST_TO_SEARCH = 10_000;

private static final Set<ForecastRequestStatus> DELETABLE_STATUSES =
EnumSet.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED);

@Inject
public TransportDeleteForecastAction(TransportService transportService, ActionFilters actionFilters, Client client) {
public TransportDeleteForecastAction(TransportService transportService,
ActionFilters actionFilters,
Client client,
ClusterService clusterService) {
super(DeleteForecastAction.NAME, transportService, actionFilters, DeleteForecastAction.Request::new);
this.client = client;
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, DeleteForecastAction.Request request, ActionListener<AcknowledgedResponse> listener) {
final String jobId = request.getJobId();
final String forecastsExpression = request.getForecastId();

String forecastsExpression = request.getForecastId();
final String[] forecastIds = Strings.tokenizeToStringArray(forecastsExpression, ",");
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, request, listener),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
Expand All @@ -95,10 +107,8 @@ protected void doExecute(Task task, DeleteForecastAction.Request request, Action
BoolQueryBuilder builder = QueryBuilders.boolQuery();
BoolQueryBuilder innerBool = QueryBuilders.boolQuery().must(
QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE));

if (Metadata.ALL.equals(request.getForecastId()) == false) {
Set<String> forcastIds = new HashSet<>(Arrays.asList(Strings.tokenizeToStringArray(forecastsExpression, ",")));
innerBool.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), forcastIds));
if (Strings.isAllOrWildcard(forecastIds) == false) {
innerBool.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), new HashSet<>(Arrays.asList(forecastIds))));
}

source.query(builder.filter(innerBool));
Expand All @@ -109,6 +119,17 @@ protected void doExecute(Task task, DeleteForecastAction.Request request, Action
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, forecastStatsHandler);
}

static void validateForecastState(Collection<ForecastRequestStats> forecastsToDelete, JobState jobState, String jobId) {
List<String> badStatusForecasts = forecastsToDelete.stream()
.filter((f) -> DELETABLE_STATUSES.contains(f.getStatus()) == false)
.map(ForecastRequestStats::getForecastId)
.collect(Collectors.toList());
if (badStatusForecasts.size() > 0 && JobState.OPENED.equals(jobState)) {
throw ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId));
}
}

private void deleteForecasts(SearchResponse searchResponse,
DeleteForecastAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
Expand All @@ -122,7 +143,7 @@ private void deleteForecasts(SearchResponse searchResponse,
}

if (forecastsToDelete.isEmpty()) {
if (Metadata.ALL.equals(request.getForecastId()) &&
if (Strings.isAllOrWildcard(new String[]{request.getForecastId()}) &&
request.isAllowNoForecasts()) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
Expand All @@ -131,13 +152,13 @@ private void deleteForecasts(SearchResponse searchResponse,
}
return;
}
List<String> badStatusForecasts = forecastsToDelete.stream()
.filter((f) -> !DELETABLE_STATUSES.contains(f.getStatus()))
.map(ForecastRequestStats::getForecastId).collect(Collectors.toList());
if (badStatusForecasts.size() > 0) {
listener.onFailure(
ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId)));
final ClusterState state = clusterService.state();
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
JobState jobState = MlTasks.getJobState(jobId, persistentTasks);
try {
validateForecastState(forecastsToDelete, jobState, jobId);
} catch (ElasticsearchException ex) {
listener.onFailure(ex);
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;


public class TransportDeleteForecastActionTests extends ESTestCase {

private static final int TEST_RUNS = 10;

public void testValidateForecastStateWithAllFailedFinished() {
for (int i = 0; i < TEST_RUNS; ++i) {
List<ForecastRequestStats> forecastRequestStats = Stream.generate(
() -> createForecastStats(randomFrom(
ForecastRequestStats.ForecastRequestStatus.FAILED,
ForecastRequestStats.ForecastRequestStatus.FINISHED
)))
.limit(randomInt(10))
.collect(Collectors.toList());

// This should not throw.
TransportDeleteForecastAction.validateForecastState(
forecastRequestStats,
randomFrom(JobState.values()),
randomAlphaOfLength(10));
}
}

public void testValidateForecastStateWithSomeFailedFinished() {
for (int i = 0; i < TEST_RUNS; ++i) {
List<ForecastRequestStats> forecastRequestStats = Stream.generate(
() -> createForecastStats(randomFrom(
ForecastRequestStats.ForecastRequestStatus.values()
)))
.limit(randomInt(10))
.collect(Collectors.toList());

forecastRequestStats.add(createForecastStats(ForecastRequestStats.ForecastRequestStatus.STARTED));

{
JobState jobState = randomFrom(JobState.CLOSED, JobState.CLOSING, JobState.FAILED);
try {
TransportDeleteForecastAction.validateForecastState(forecastRequestStats, jobState, randomAlphaOfLength(10));
} catch (Exception ex) {
fail("Should not have thrown: " + ex.getMessage());
}
}
{
JobState jobState = JobState.OPENED;
expectThrows(
ElasticsearchStatusException.class,
() -> TransportDeleteForecastAction.validateForecastState(forecastRequestStats, jobState, randomAlphaOfLength(10))
);
}
}
}


private static ForecastRequestStats createForecastStats(ForecastRequestStats.ForecastRequestStatus status) {
ForecastRequestStats forecastRequestStats = new ForecastRequestStats(randomAlphaOfLength(10), randomAlphaOfLength(10));
forecastRequestStats.setStatus(status);
return forecastRequestStats;
}

}

0 comments on commit decc627

Please sign in to comment.