From b9cd9ec58d8c61457269032cecd3018811e9e360 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 28 Sep 2020 17:21:12 +0100 Subject: [PATCH] [ML] Set parent task Id on ml expired data removers (#62854) Setting the parent task Id (of the delete expired data action) on the ML expired data removers makes it easier to track and cancel long running tasks --- .../ml/integration/UnusedStatsRemoverIT.java | 3 +- .../TransportDeleteExpiredDataAction.java | 39 +++++++++++-------- .../AbstractExpiredJobDataRemover.java | 9 ++++- .../job/retention/EmptyStateIndexRemover.java | 10 ++++- .../retention/ExpiredForecastsRemover.java | 7 +++- .../ExpiredModelSnapshotsRemover.java | 9 ++++- .../job/retention/ExpiredResultsRemover.java | 7 +++- .../ml/job/retention/UnusedStateRemover.java | 9 ++++- .../ml/job/retention/UnusedStatsRemover.java | 6 ++- .../AbstractExpiredJobDataRemoverTests.java | 3 +- .../EmptyStateIndexRemoverTests.java | 3 +- .../ExpiredModelSnapshotsRemoverTests.java | 3 +- .../retention/ExpiredResultsRemoverTests.java | 4 +- 13 files changed, 80 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java index 3ef0722d3f94c..cf376e2467f6e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; @@ -114,7 +115,7 @@ public void testRemoveUnusedStats() throws Exception { client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); PlainActionFuture deletionListener = new PlainActionFuture<>(); - UnusedStatsRemover statsRemover = new UnusedStatsRemover(client); + UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L)); statsRemover.remove(10000.0f, deletionListener, () -> false); deletionListener.actionGet(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 2d9f00d1bdfbc..d8c97d165da0d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -88,11 +89,13 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request, request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis()) ); + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + Supplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) { - List dataRemovers = createDataRemovers(client, auditor); + List dataRemovers = createDataRemovers(client, taskId, auditor); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( () -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier) ); @@ -101,7 +104,7 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request, jobBuilders -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { List jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); - List dataRemovers = createDataRemovers(jobs, auditor); + List dataRemovers = createDataRemovers(jobs, taskId, auditor); deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier); } ); @@ -164,24 +167,28 @@ void deleteExpiredData(Iterator mlDataRemoversIterator, } } - private List createDataRemovers(OriginSettingClient client, AnomalyDetectionAuditor auditor) { + private List createDataRemovers(OriginSettingClient client, + TaskId parentTaskId, + AnomalyDetectionAuditor auditor) { return Arrays.asList( - new ExpiredResultsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), auditor, threadPool), - new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool), - new UnusedStateRemover(client, clusterService), - new EmptyStateIndexRemover(client), - new UnusedStatsRemover(client)); + new ExpiredResultsRemover(client, + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool), + new ExpiredForecastsRemover(client, threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId), + new UnusedStateRemover(client, clusterService, parentTaskId), + new EmptyStateIndexRemover(client, parentTaskId), + new UnusedStatsRemover(client, parentTaskId)); } - private List createDataRemovers(List jobs, AnomalyDetectionAuditor auditor) { + private List createDataRemovers(List jobs, TaskId parentTaskId, AnomalyDetectionAuditor auditor) { return Arrays.asList( - new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), auditor, threadPool), - new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool), - new UnusedStateRemover(client, clusterService), - new EmptyStateIndexRemover(client), - new UnusedStatsRemover(client)); + new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool), + new ExpiredForecastsRemover(client, threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId), + new UnusedStateRemover(client, clusterService, parentTaskId), + new EmptyStateIndexRemover(client, parentTaskId), + new UnusedStatsRemover(client, parentTaskId)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index b001e8b537d07..9487e14345f79 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -27,10 +28,16 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { protected final OriginSettingClient client; private final Iterator jobIterator; + private final TaskId parentTaskId; - AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator) { + AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator, TaskId parentTaskId) { this.client = client; this.jobIterator = jobIterator; + this.parentTaskId = parentTaskId; + } + + protected TaskId getParentTaskId() { + return parentTaskId; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java index 586b259e5ad99..0b7de4aa79c5b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import java.util.Objects; @@ -24,11 +25,13 @@ * This class deletes empty indices matching .ml-state* pattern that are not pointed at by the .ml-state-write alias. */ public class EmptyStateIndexRemover implements MlDataRemover { - + private final OriginSettingClient client; + private final TaskId parentTaskId; - public EmptyStateIndexRemover(OriginSettingClient client) { + public EmptyStateIndexRemover(OriginSettingClient client, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); + this.parentTaskId = parentTaskId; } @Override @@ -69,6 +72,7 @@ public void remove(float requestsPerSec, ActionListener listener, Suppl private void getEmptyStateIndices(ActionListener> listener) { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern()); + indicesStatsRequest.setParentTask(parentTaskId); client.admin().indices().stats( indicesStatsRequest, ActionListener.wrap( @@ -87,6 +91,7 @@ private void getEmptyStateIndices(ActionListener> listener) { private void getCurrentStateIndices(ActionListener> listener) { GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); + getIndexRequest.setParentTask(parentTaskId); client.admin().indices().getIndex( getIndexRequest, ActionListener.wrap( @@ -98,6 +103,7 @@ private void getCurrentStateIndices(ActionListener> listener) { private void executeDeleteEmptyStateIndices(Set emptyStateIndices, ActionListener listener) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0])); + deleteIndexRequest.setParentTask(parentTaskId); client.admin().indices().delete( deleteIndexRequest, ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 3b47bd0cc3fd8..36124971bd1e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -59,11 +60,13 @@ public class ExpiredForecastsRemover implements MlDataRemover { private final OriginSettingClient client; private final ThreadPool threadPool; private final long cutoffEpochMs; + private final TaskId parentTaskId; - public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) { + public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); + this.parentTaskId = parentTaskId; } @Override @@ -90,6 +93,7 @@ public void remove(float requestsPerSec, ActionListener listener, Suppl SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN); searchRequest.source(source); + searchRequest.setParentTask(parentTaskId); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } @@ -114,6 +118,7 @@ private void deleteForecasts( DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete) .setRequestsPerSecond(requestsPerSec) .setAbortOnVersionConflict(false); + request.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 6bcd0c420dd93..ac9566bb2f731 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; @@ -65,8 +66,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private final ThreadPool threadPool; - public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, ThreadPool threadPool) { - super(client, jobIterator); + public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, + ThreadPool threadPool, TaskId parentTaskId) { + super(client, jobIterator, parentTaskId); this.threadPool = Objects.requireNonNull(threadPool); } @@ -118,6 +120,7 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + searchRequest.setParentTask(getParentTaskId()); client.search(searchRequest, ActionListener.wrap( response -> { @@ -176,6 +179,7 @@ protected void removeDataBefore( source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); searchRequest.source(source); + searchRequest.setParentTask(getParentTaskId()); long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); @@ -233,6 +237,7 @@ private void deleteModelSnapshots(Iterator modelSnapshotIterator, JobSnapshotId idPair = modelSnapshotIterator.next(); DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); + deleteSnapshotRequest.setParentTask(getParentTaskId()); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse response) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 8a1ac3dab6023..7e8a6e46633df 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -71,9 +72,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final AnomalyDetectionAuditor auditor; private final ThreadPool threadPool; - public ExpiredResultsRemover(OriginSettingClient client, Iterator jobIterator, + public ExpiredResultsRemover(OriginSettingClient client, Iterator jobIterator, TaskId parentTaskId, AnomalyDetectionAuditor auditor, ThreadPool threadPool) { - super(client, jobIterator); + super(client, jobIterator, parentTaskId); this.auditor = Objects.requireNonNull(auditor); this.threadPool = Objects.requireNonNull(threadPool); } @@ -93,6 +94,7 @@ protected void removeDataBefore( ) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); + request.setParentTask(getParentTaskId()); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @Override @@ -167,6 +169,7 @@ private void latestBucketTime(String jobId, ActionListener listener) { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + searchRequest.setParentTask(getParentTaskId()); client.search(searchRequest, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index c27f6da09ecc7..4f69b7c021b6b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -15,6 +15,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -51,10 +52,13 @@ public class UnusedStateRemover implements MlDataRemover { private final OriginSettingClient client; private final ClusterService clusterService; + private final TaskId parentTaskId; - public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) { + public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService, + TaskId parentTaskId) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); + this.parentTaskId = Objects.requireNonNull(parentTaskId); } @Override @@ -142,6 +146,7 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, float reque // _doc is the most efficient sort order and will also disable scoring deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { @@ -163,7 +168,7 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, float reque private static class JobIdExtractor { - private static List> extractors = Arrays.asList( + private static final List> extractors = Arrays.asList( ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java index fa7cb6ae6b274..8487be88af153 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -41,9 +42,11 @@ public class UnusedStatsRemover implements MlDataRemover { private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class); private final OriginSettingClient client; + private final TaskId parentTaskId; - public UnusedStatsRemover(OriginSettingClient client) { + public UnusedStatsRemover(OriginSettingClient client, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); + this.parentTaskId = Objects.requireNonNull(parentTaskId); } @Override @@ -97,6 +100,7 @@ private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec .setAbortOnVersionConflict(false) .setRequestsPerSecond(requestsPerSec) .setQuery(dbq); + deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 08ae54711392f..56be5ba9036ad 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -48,7 +49,7 @@ private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDat private int getRetentionDaysCallCount = 0; ConcreteExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator) { - super(client, jobIterator); + super(client, jobIterator, new TaskId("test", 0L)); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java index 8cf4ac4cc1998..627c01b7fc8f7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; @@ -57,7 +58,7 @@ public void setUpTests() { listener = mock(ActionListener.class); deleteIndexRequestCaptor = ArgumentCaptor.forClass(DeleteIndexRequest.class); - remover = new EmptyStateIndexRemover(originSettingClient); + remover = new EmptyStateIndexRemover(originSettingClient, new TaskId("test", 0L)); } @After diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 97c23286ea81a..263e56ce7c615 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -252,7 +253,7 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator return null; } ).when(executor).execute(any()); - return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool); + return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool, new TaskId("test", 0L)); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index a8ee0e5848d28..8aafbaaafccd8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -197,6 +198,7 @@ private ExpiredResultsRemover createExpiredResultsRemover(Iterator jobItera } ).when(executor).execute(any()); - return new ExpiredResultsRemover(originSettingClient, jobIterator, mock(AnomalyDetectionAuditor.class), threadPool); + return new ExpiredResultsRemover(originSettingClient, jobIterator, new TaskId("test", 0L), + mock(AnomalyDetectionAuditor.class), threadPool); } }