From 783df3f9610a1f29149a10c1024758db566c74e3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 19 Mar 2020 11:15:55 -0400 Subject: [PATCH] [ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725) This fixes two issues: - Results persister would retry actions even if they are not intermittent. An example of an persistent failure is a doc mapping problem. - Data frame analytics would continue to retry to persist results even after the job is stopped. closes https://github.com/elastic/elasticsearch/issues/53687 --- .../process/AnalyticsResultProcessor.java | 5 +- .../process/DataFrameRowsJoiner.java | 14 ++++- .../persistence/ResultsPersisterService.java | 53 +++++++++++++++++- .../ResultsPersisterServiceTests.java | 55 ++++++++++++++++++- 4 files changed, 118 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 34be13967b5d2..d5873eb29aa23 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -108,6 +108,7 @@ public void awaitForCompletion() { } public void cancel() { + dataFrameRowsJoiner.cancel(); isCancelled = true; } @@ -265,12 +266,12 @@ private void indexStatsResult(ToXContentObject result, Function new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), WriteRequest.RefreshPolicy.IMMEDIATE, docIdSupplier.apply(analytics.getId()), - () -> true, + () -> isCancelled == false, errorMsg -> auditor.error(analytics.getId(), "failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg) ); } catch (IOException ioe) { - LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe); + LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe); } catch (Exception e) { LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java index 93c28c8a8d32b..0296b16e488d2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java @@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable { private final Iterator dataFrameRowsIterator; private LinkedList currentResults; private volatile String failure; + private volatile boolean isCancelled; - DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, - ResultsPersisterService resultsPersisterService) { + DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) { this.analyticsId = Objects.requireNonNull(analyticsId); this.dataExtractor = Objects.requireNonNull(dataExtractor); this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); @@ -70,6 +70,10 @@ void processRowResults(RowResults rowResults) { } } + void cancel() { + isCancelled = true; + } + private void addResultAndJoinIfEndOfBatch(RowResults rowResults) { currentResults.add(rowResults); if (currentResults.size() == RESULTS_BATCH_SIZE) { @@ -87,7 +91,11 @@ private void joinCurrentResults() { } if (bulkRequest.numberOfActions() > 0) { resultsPersisterService.bulkIndexWithHeadersWithRetry( - dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {}); + dataExtractor.getHeaders(), + bulkRequest, + analyticsId, + () -> isCancelled == false, + errorMsg -> {}); } currentResults = new LinkedList<>(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 5ae76f9491861..3adab97b7f5f8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -33,6 +34,8 @@ import java.io.IOException; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Random; import java.util.Set; @@ -42,6 +45,22 @@ import java.util.stream.Collectors; public class ResultsPersisterService { + /** + * List of rest statuses that we consider irrecoverable + */ + public static final Set IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>( + Arrays.asList( + RestStatus.GONE, + RestStatus.NOT_IMPLEMENTED, + RestStatus.NOT_FOUND, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.FORBIDDEN, + RestStatus.METHOD_NOT_ALLOWED, + RestStatus.NOT_ACCEPTABLE + ) + )); + private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class); public static final Setting PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting( @@ -124,9 +143,23 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest, if (bulkResponse.hasFailures() == false) { return bulkResponse; } - + for (BulkItemResponse itemResponse : bulkResponse.getItems()) { + if (itemResponse.isFailed()) { + if (isIrrecoverable(itemResponse.getFailure().getCause())) { + Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause()); + LOGGER.warn(new ParameterizedMessage( + "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", + jobId, + bulkResponse.buildFailureMessage()), + unwrappedParticular); + throw new ElasticsearchException( + "{} experienced failure that cannot be automatically retried. See logs for bulk failures", + unwrappedParticular, + jobId); + } + } + } retryContext.nextIteration("index", bulkResponse.buildFailureMessage()); - // We should only retry the docs that failed. bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse); } @@ -148,12 +181,28 @@ public SearchResponse searchWithRetry(SearchRequest searchRequest, } catch (ElasticsearchException e) { LOGGER.warn("[" + jobId + "] Exception while executing search action", e); failureMessage = e.getDetailedMessage(); + if (isIrrecoverable(e)) { + LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e); + throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId); + } } retryContext.nextIteration("search", failureMessage); } } + /** + * @param ex The exception to check + * @return true when the failure will persist no matter how many times we retry. + */ + private static boolean isIrrecoverable(Exception ex) { + Throwable t = ExceptionsHelper.unwrapCause(ex); + if (t instanceof ElasticsearchException) { + return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status()); + } + return false; + } + /** * {@link RetryContext} object handles logic that is executed between consecutive retries of an action. * diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java index 8e5a71f2fb2eb..af47efeef7d2b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.utils.persistence; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkAction; @@ -27,8 +28,10 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -131,7 +134,8 @@ public void testSearchWithRetries_SuccessAfterRetry() { } public void testSearchWithRetries_SuccessAfterRetryDueToException() { - doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS)) + doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID"))) + .doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS)) .when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any()); List messages = new ArrayList<>(); @@ -206,6 +210,21 @@ public void testSearchWithRetries_Failure_ShouldNotRetryAfterRandomNumberOfRetri verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any()); } + public void testSearchWithRetries_FailureOnIrrecoverableError() { + resultsPersisterService.setMaxFailureRetries(5); + + doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST))) + .when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any()); + + ElasticsearchException e = + expectThrows( + ElasticsearchException.class, + () -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {})); + assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried")); + + verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any()); + } + private static Supplier shouldRetryUntil(int maxRetries) { return new Supplier<>() { int retries = 0; @@ -240,6 +259,29 @@ public void testBulkRequestChangeOnFailures() { assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in")); } + public void testBulkRequestChangeOnIrrecoverableFailures() { + int maxFailureRetries = 10; + resultsPersisterService.setMaxFailureRetries(maxFailureRetries); + BulkItemResponse irrecoverable = new BulkItemResponse( + 2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST))); + doAnswerWithResponses( + new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L), + new BulkResponse(new BulkItemResponse[0], 0L)) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(INDEX_REQUEST_FAILURE); + bulkRequest.add(INDEX_REQUEST_SUCCESS); + + ElasticsearchException ex = expectThrows(ElasticsearchException.class, + () -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{})); + + verify(client).execute(eq(BulkAction.INSTANCE), any(), any()); + assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried.")); + } + public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() { doAnswerWithResponses( new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L), @@ -315,6 +357,15 @@ private static Answer withResponse(Response response) { }; } + @SuppressWarnings("unchecked") + private static Answer withFailure(Exception failure) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(failure); + return null; + }; + } + private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) { CheckedConsumer sleeper = millis -> {}; ThreadPool tp = mock(ThreadPool.class);