Skip to content

Commit

Permalink
[ML] Set parent task Id on ml expired data removers (elastic#62854)
Browse files Browse the repository at this point in the history
Setting the parent task Id (of the delete expired data action) on the ML
expired data removers makes it easier to track and cancel long running
tasks
  • Loading branch information
davidkyle authored Sep 28, 2020
1 parent 2c43def commit b9cd9ec
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void testRemoveUnusedStats() throws Exception {
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();

PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client);
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
statsRemover.remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -88,11 +89,13 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
);

TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());

Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());

if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
List<MlDataRemover> dataRemovers = createDataRemovers(client, auditor);
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)
);
Expand All @@ -101,7 +104,7 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
jobBuilders -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, auditor);
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, auditor);
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
}
);
Expand Down Expand Up @@ -164,24 +167,28 @@ void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
}
}

private List<MlDataRemover> createDataRemovers(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
private List<MlDataRemover> createDataRemovers(OriginSettingClient client,
TaskId parentTaskId,
AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client),
new UnusedStatsRemover(client));
new ExpiredResultsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
}

private List<MlDataRemover> createDataRemovers(List<Job> jobs, AnomalyDetectionAuditor auditor) {
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client),
new UnusedStatsRemover(client));
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Result;

Expand All @@ -27,10 +28,16 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

protected final OriginSettingClient client;
private final Iterator<Job> jobIterator;
private final TaskId parentTaskId;

AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator) {
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId) {
this.client = client;
this.jobIterator = jobIterator;
this.parentTaskId = parentTaskId;
}

protected TaskId getParentTaskId() {
return parentTaskId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;

import java.util.Objects;
Expand All @@ -24,11 +25,13 @@
* This class deletes empty indices matching .ml-state* pattern that are not pointed at by the .ml-state-write alias.
*/
public class EmptyStateIndexRemover implements MlDataRemover {

private final OriginSettingClient client;
private final TaskId parentTaskId;

public EmptyStateIndexRemover(OriginSettingClient client) {
public EmptyStateIndexRemover(OriginSettingClient client, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.parentTaskId = parentTaskId;
}

@Override
Expand Down Expand Up @@ -69,6 +72,7 @@ public void remove(float requestsPerSec, ActionListener<Boolean> listener, Suppl

private void getEmptyStateIndices(ActionListener<Set<String>> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern());
indicesStatsRequest.setParentTask(parentTaskId);
client.admin().indices().stats(
indicesStatsRequest,
ActionListener.wrap(
Expand All @@ -87,6 +91,7 @@ private void getEmptyStateIndices(ActionListener<Set<String>> listener) {

private void getCurrentStateIndices(ActionListener<Set<String>> listener) {
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
getIndexRequest.setParentTask(parentTaskId);
client.admin().indices().getIndex(
getIndexRequest,
ActionListener.wrap(
Expand All @@ -98,6 +103,7 @@ private void getCurrentStateIndices(ActionListener<Set<String>> listener) {

private void executeDeleteEmptyStateIndices(Set<String> emptyStateIndices, ActionListener<Boolean> listener) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0]));
deleteIndexRequest.setParentTask(parentTaskId);
client.admin().indices().delete(
deleteIndexRequest,
ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand Down Expand Up @@ -59,11 +60,13 @@ public class ExpiredForecastsRemover implements MlDataRemover {
private final OriginSettingClient client;
private final ThreadPool threadPool;
private final long cutoffEpochMs;
private final TaskId parentTaskId;

public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) {
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
this.parentTaskId = parentTaskId;
}

@Override
Expand All @@ -90,6 +93,7 @@ public void remove(float requestsPerSec, ActionListener<Boolean> listener, Suppl

SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
searchRequest.setParentTask(parentTaskId);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}
Expand All @@ -114,6 +118,7 @@ private void deleteForecasts(
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete)
.setRequestsPerSecond(requestsPerSec)
.setAbortOnVersionConflict(false);
request.setParentTask(parentTaskId);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
Expand Down Expand Up @@ -65,8 +66,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover

private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator, ThreadPool threadPool) {
super(client, jobIterator);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
ThreadPool threadPool, TaskId parentTaskId) {
super(client, jobIterator, parentTaskId);
this.threadPool = Objects.requireNonNull(threadPool);
}

Expand Down Expand Up @@ -118,6 +120,7 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
searchRequest.setParentTask(getParentTaskId());

client.search(searchRequest, ActionListener.wrap(
response -> {
Expand Down Expand Up @@ -176,6 +179,7 @@ protected void removeDataBefore(
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
searchRequest.source(source);
searchRequest.setParentTask(getParentTaskId());

long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
Expand Down Expand Up @@ -233,6 +237,7 @@ private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator,
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
deleteSnapshotRequest.setParentTask(getParentTaskId());
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
Expand Down Expand Up @@ -71,9 +72,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;

public ExpiredResultsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
public ExpiredResultsRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId,
AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client, jobIterator);
super(client, jobIterator, parentTaskId);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}
Expand All @@ -93,6 +94,7 @@ protected void removeDataBefore(
) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs);
request.setParentTask(getParentTaskId());

client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -167,6 +169,7 @@ private void latestBucketTime(String jobId, ActionListener<Long> listener) {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
searchRequest.setParentTask(getParentTaskId());

client.search(searchRequest, ActionListener.wrap(
response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -51,10 +52,13 @@ public class UnusedStateRemover implements MlDataRemover {

private final OriginSettingClient client;
private final ClusterService clusterService;
private final TaskId parentTaskId;

public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) {
public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService,
TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.parentTaskId = Objects.requireNonNull(parentTaskId);
}

@Override
Expand Down Expand Up @@ -142,6 +146,7 @@ private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float reque

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
deleteByQueryRequest.setParentTask(parentTaskId);

client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
Expand All @@ -163,7 +168,7 @@ private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float reque

private static class JobIdExtractor {

private static List<Function<String, String>> extractors = Arrays.asList(
private static final List<Function<String, String>> extractors = Arrays.asList(
ModelState::extractJobId,
Quantiles::extractJobId,
CategorizerState::extractJobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand All @@ -41,9 +42,11 @@ public class UnusedStatsRemover implements MlDataRemover {
private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class);

private final OriginSettingClient client;
private final TaskId parentTaskId;

public UnusedStatsRemover(OriginSettingClient client) {
public UnusedStatsRemover(OriginSettingClient client, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.parentTaskId = Objects.requireNonNull(parentTaskId);
}

@Override
Expand Down Expand Up @@ -97,6 +100,7 @@ private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec
.setAbortOnVersionConflict(false)
.setRequestsPerSecond(requestsPerSec)
.setQuery(dbq);
deleteByQueryRequest.setParentTask(parentTaskId);

client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand Down Expand Up @@ -48,7 +49,7 @@ private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDat
private int getRetentionDaysCallCount = 0;

ConcreteExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator) {
super(client, jobIterator);
super(client, jobIterator, new TaskId("test", 0L));
}

@Override
Expand Down
Loading

0 comments on commit b9cd9ec

Please sign in to comment.