Skip to content

Commit

Permalink
[7.x] [ML] only retry persistence failures when the failure is interm…
Browse files Browse the repository at this point in the history
…ittent and stop retrying when analytics job is stopping (#53725) (#53808)

* [ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725)

This fixes two issues:


- Results persister would retry actions even if they are not intermittent. An example of an persistent failure is a doc mapping problem.
- Data frame analytics would continue to retry to persist results even after the job is stopped.

closes #53687
  • Loading branch information
benwtrent authored Mar 19, 2020
1 parent cce6021 commit 433952b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void awaitForCompletion() {
}

public void cancel() {
dataFrameRowsJoiner.cancel();
isCancelled = true;
}

Expand Down Expand Up @@ -264,12 +265,12 @@ private void indexStatsResult(ToXContentObject result, Function<String, String>
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
WriteRequest.RefreshPolicy.IMMEDIATE,
docIdSupplier.apply(analytics.getId()),
() -> true,
() -> isCancelled == false,
errorMsg -> auditor.error(analytics.getId(),
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
);
} catch (IOException ioe) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
} catch (Exception e) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable {
private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
private LinkedList<RowResults> currentResults;
private volatile String failure;
private volatile boolean isCancelled;

DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
ResultsPersisterService resultsPersisterService) {
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
this.analyticsId = Objects.requireNonNull(analyticsId);
this.dataExtractor = Objects.requireNonNull(dataExtractor);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
Expand Down Expand Up @@ -70,6 +70,10 @@ void processRowResults(RowResults rowResults) {
}
}

void cancel() {
isCancelled = true;
}

private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
currentResults.add(rowResults);
if (currentResults.size() == RESULTS_BATCH_SIZE) {
Expand All @@ -87,7 +91,11 @@ private void joinCurrentResults() {
}
if (bulkRequest.numberOfActions() > 0) {
resultsPersisterService.bulkIndexWithHeadersWithRetry(
dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
dataExtractor.getHeaders(),
bulkRequest,
analyticsId,
() -> isCancelled == false,
errorMsg -> {});
}
currentResults = new LinkedList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -33,6 +34,8 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand All @@ -42,6 +45,22 @@
import java.util.stream.Collectors;

public class ResultsPersisterService {
/**
* List of rest statuses that we consider irrecoverable
*/
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
)
));

private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);

public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
Expand Down Expand Up @@ -124,9 +143,23 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest,
if (bulkResponse.hasFailures() == false) {
return bulkResponse;
}

for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
LOGGER.warn(new ParameterizedMessage(
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
jobId,
bulkResponse.buildFailureMessage()),
unwrappedParticular);
throw new ElasticsearchException(
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
unwrappedParticular,
jobId);
}
}
}
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());

// We should only retry the docs that failed.
bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
}
Expand All @@ -148,12 +181,28 @@ public SearchResponse searchWithRetry(SearchRequest searchRequest,
} catch (ElasticsearchException e) {
LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
failureMessage = e.getDetailedMessage();
if (isIrrecoverable(e)) {
LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
}
}

retryContext.nextIteration("search", failureMessage);
}
}

/**
* @param ex The exception to check
* @return true when the failure will persist no matter how many times we retry.
*/
private static boolean isIrrecoverable(Exception ex) {
Throwable t = ExceptionsHelper.unwrapCause(ex);
if (t instanceof ElasticsearchException) {
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
}
return false;
}

/**
* {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.utils.persistence;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
Expand All @@ -28,8 +29,10 @@
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -133,7 +136,8 @@ public void testSearchWithRetries_SuccessAfterRetry() {
}

public void testSearchWithRetries_SuccessAfterRetryDueToException() {
doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
.doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

List<String> messages = new ArrayList<>();
Expand Down Expand Up @@ -208,6 +212,21 @@ public void testSearchWithRetries_Failure_ShouldNotRetryAfterRandomNumberOfRetri
verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
}

public void testSearchWithRetries_FailureOnIrrecoverableError() {
resultsPersisterService.setMaxFailureRetries(5);

doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

ElasticsearchException e =
expectThrows(
ElasticsearchException.class,
() -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));

verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
}

private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
return new Supplier<Boolean>() {
int retries = 0;
Expand Down Expand Up @@ -242,6 +261,29 @@ public void testBulkRequestChangeOnFailures() {
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
}

public void testBulkRequestChangeOnIrrecoverableFailures() {
int maxFailureRetries = 10;
resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
BulkItemResponse irrecoverable = new BulkItemResponse(
2,
DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("my-index", "_doc", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
new BulkResponse(new BulkItemResponse[0], 0L))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(INDEX_REQUEST_FAILURE);
bulkRequest.add(INDEX_REQUEST_SUCCESS);

ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));

verify(client).execute(eq(BulkAction.INSTANCE), any(), any());
assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
}

public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
Expand Down Expand Up @@ -317,6 +359,15 @@ private static <Response> Answer<Response> withResponse(Response response) {
};
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withFailure(Exception failure) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onFailure(failure);
return null;
};
}

private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
ThreadPool tp = mock(ThreadPool.class);
Expand Down

0 comments on commit 433952b

Please sign in to comment.