Skip to content

Commit

Permalink
[ML] Refresh results indices before running delete by query (#74292)
Browse files Browse the repository at this point in the history
Test failures in #74101 revealed that the last documents persisted from
the job running on a node before it goes down may not be deleted
when the reset action is executed. The reason is that the results
index has not been refreshed thus those docs are not visible to
the search the delete by query action is doing.

This commit adds a call to the refresh API before running delete
by query to the results indices.

Closes #74101
  • Loading branch information
dimitris-athanasiou authored Jun 18, 2021
1 parent b6168e2 commit c9ad768
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ public void testJobRelocationIsMemoryAware() throws Exception {
});
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/74101")
public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("Starting dedicated master node...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
package org.elasticsearch.xpack.ml.job.persistence;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
Expand All @@ -26,10 +30,10 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
Expand Down Expand Up @@ -280,17 +284,7 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
response -> {
if (response && indexNames.get().length > 0) {
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
.setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setAbortOnVersionConflict(false)
.setRefresh(true);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
deleteResultsByQuery(jobId, indexNames.get(), dbqHandler);
} else { // We did not execute DBQ, no need to delete aliases or check the response
dbqHandler.onResponse(null);
}
Expand Down Expand Up @@ -414,6 +408,32 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp
deleteModelState(jobId, deleteStateHandler);
}

private void deleteResultsByQuery(String jobId, String[] indices, ActionListener<BulkByScrollResponse> listener) {
assert indices.length > 0;

ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
refreshResponse -> {
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(indices)
.setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setAbortOnVersionConflict(false)
.setRefresh(true);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
},
listener::onFailure
);

// First, we refresh the indices to ensure any in-flight docs become visible
RefreshRequest refreshRequest = new RefreshRequest(indices);
refreshRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()));
executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener);
}

private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
Expand Down

0 comments on commit c9ad768

Please sign in to comment.