Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Throttle the delete-by-query of expired results #47177

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public Response(StreamInput in) throws IOException {
deleted = in.readBoolean();
}

public boolean isDeleted() {
return deleted;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(deleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,54 @@
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;

public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {

// TODO: make configurable in the request
static final Duration MAX_DURATION = Duration.ofHours(8);

private final ThreadPool threadPool;
private final String executor;
private final Client client;
private final ClusterService clusterService;
private final Clock clock;

@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new);
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
Clock.systemUTC());
}

TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}

@Override
protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION);
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier));
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
Expand All @@ -62,25 +82,32 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);
}

private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier,
boolean haveAllPreviousDeletionsCompleted) {
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse),
listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread
// the previous action returned in which in the case of a transport_client_boss
// thread is a disaster.
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
false));
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
isTimedOutSupplier);
} else {
logger.info("Completed deletion of expired data");
listener.onResponse(new DeleteExpiredDataAction.Response(true));
if (haveAllPreviousDeletionsCompleted) {
logger.info("Completed deletion of expired ML data");
} else {
logger.info("Halted deletion of expired ML data until next invocation");
}
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;

Expand Down Expand Up @@ -79,6 +80,9 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try {
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
} catch (Exception e) {
Expand All @@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
Expand All @@ -116,6 +124,9 @@ public void deleteInterimResults() {
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
} catch (Exception e) {
Expand All @@ -134,6 +145,9 @@ public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> liste
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -40,11 +41,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}

@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), listener, isTimedOutSupplier);
}

private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
Expand All @@ -56,13 +58,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
}

private WrappedBatchedJobsIterator newJobIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
Expand All @@ -44,6 +45,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
Expand Down Expand Up @@ -71,10 +73,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
}

@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener),
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));

SearchSourceBuilder source = new SearchSourceBuilder();
Expand All @@ -84,13 +86,16 @@ public void remove(ActionListener<Boolean> listener) {
source.size(MAX_FORECASTS);
source.trackTotalHits(true);

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);

SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
Expand All @@ -99,6 +104,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boole
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
Expand Down Expand Up @@ -157,6 +167,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));

client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
Expand Down Expand Up @@ -88,13 +89,21 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);

// Delete the documents gradually.
// With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes.
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE);
request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5);

request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.action.ActionListener;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener);
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
}
Loading