From 5d35eaa1cbe008aa18096c1c607c257599d4fc6c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 12 Feb 2020 14:14:36 +0100 Subject: [PATCH] [Transform] improve irrecoverable error detection - part 2 (#52003) base error handling on rest status instead of listing individual exception types relates to #51820 --- .../transforms/TransformIndexer.java | 83 +++++---- .../utils/ExceptionRootCauseFinder.java | 35 +++- .../utils/ExceptionRootCauseFinderTests.java | 160 ++++++++++++++++++ 3 files changed, 240 insertions(+), 38 deletions(-) create mode 100644 x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 6b282131e9d73..6f1925bdc8172 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.ScriptException; @@ -42,7 +41,6 @@ import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; @@ -287,7 +285,7 @@ protected void onStart(long now, ActionListener listener) { // If the transform config index or the transform config is gone, something serious occurred // We are in an unknown state and should fail out if (failure instanceof ResourceNotFoundException) { - updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure)); + updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure)); } else { auditor.warning(getJobId(), msg); updateConfigListener.onResponse(null); @@ -477,37 +475,54 @@ synchronized void handleFailure(Exception e) { if (unwrappedException instanceof CircuitBreakingException) { handleCircuitBreakingException((CircuitBreakingException) unwrappedException); - } else if (unwrappedException instanceof ScriptException) { + return; + } + + if (unwrappedException instanceof ScriptException) { handleScriptException((ScriptException) unwrappedException); - // irrecoverable error without special handling - } else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { + return; + } + + if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException); - } else if (unwrappedException instanceof IndexNotFoundException - || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException - || unwrappedException instanceof TransformConfigReloadingException - || unwrappedException instanceof ResourceNotFoundException - || unwrappedException instanceof IllegalArgumentException) { - failIndexer("task encountered irrecoverable failure: " + e.getMessage()); - } else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { - failIndexer( - "task encountered more than " - + context.getNumFailureRetries() - + " failures; latest failure: " - + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) - ); - } else { - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous - // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one - if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { - String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + return; + } - auditor.warning( - getJobId(), - "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." - ); - lastAuditedExceptionMessage = message; - } + // irrecoverable error without special handling + if (unwrappedException instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException; + if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage()); + return; } + } + + if (unwrappedException instanceof IllegalArgumentException) { + failIndexer("task encountered irrecoverable failure: " + e.getMessage()); + return; + } + + if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { + failIndexer( + "task encountered more than " + + context.getNumFailureRetries() + + " failures; latest failure: " + + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) + ); + return; + } + + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one + if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { + String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + + auditor.warning( + getJobId(), + "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." + ); + lastAuditedExceptionMessage = message; + } } /** @@ -901,8 +916,12 @@ private RunState determineRunStateAtStart() { return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; } - static class TransformConfigReloadingException extends ElasticsearchException { - TransformConfigReloadingException(String msg, Throwable cause, Object... args) { + /** + * Thrown when the transform configuration disappeared permanently. + * (not if reloading failed due to an intermittent problem) + */ + static class TransformConfigLostOnReloadException extends ResourceNotFoundException { + TransformConfigLostOnReloadException(String msg, Throwable cause, Object... args) { super(msg, cause, args); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index 024114a6cb943..ddfe3dcc31ace 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -7,19 +7,37 @@ package org.elasticsearch.xpack.transform.utils; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.rest.RestStatus; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; /** * Set of static utils to find the cause of a search exception. */ public final class ExceptionRootCauseFinder { + /** + * List of rest statuses that we consider irrecoverable + */ + public static final Set IRRECOVERABLE_REST_STATUSES = new HashSet<>( + Arrays.asList( + RestStatus.GONE, + RestStatus.NOT_IMPLEMENTED, + RestStatus.NOT_FOUND, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.FORBIDDEN, + RestStatus.METHOD_NOT_ALLOWED, + RestStatus.NOT_ACCEPTABLE + ) + ); + /** * Unwrap the exception stack and return the most likely cause. * @@ -61,17 +79,22 @@ public static String getDetailedMessage(Throwable t) { /** * Return the first irrecoverableException from a collection of bulk responses if there are any. * - * @param failures a collection of bulk item responses + * @param failures a collection of bulk item responses with failures * @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found */ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection failures) { for (BulkItemResponse failure : failures) { Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause()); - if (unwrappedThrowable instanceof MapperParsingException - || unwrappedThrowable instanceof IllegalArgumentException - || unwrappedThrowable instanceof ResourceNotFoundException) { + if (unwrappedThrowable instanceof IllegalArgumentException) { return unwrappedThrowable; } + + if (unwrappedThrowable instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedThrowable; + if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + return elasticsearchException; + } + } } return null; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java new file mode 100644 index 0000000000000..7e5cf02ce62fa --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class ExceptionRootCauseFinderTests extends ESTestCase { + public void testFetFirstIrrecoverableExceptionFromBulkResponses() { + Map bulkItemResponses = new HashMap<>(); + + int id = 1; + // 1 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new MapperParsingException("mapper parsing error")) + ) + ); + // 2 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new ResourceNotFoundException("resource not found error")) + ) + ); + // 3 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new IllegalArgumentException("illegal argument error")) + ) + ); + // 4 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new EsRejectedExecutionException("es rejected execution")) + ) + ); + // 5 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new TranslogException(new ShardId("the_index", "uid", 0), "translog error") + ) + ) + ); + // 6 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED) + ) + ) + ); + // 7 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN) + ) + ) + ); + // 8 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS) + ) + ) + ); + // 9 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR) + ) + ) + ); + + assertFirstException(bulkItemResponses.values(), MapperParsingException.class, "mapper parsing error"); + bulkItemResponses.remove(1); + assertFirstException(bulkItemResponses.values(), ResourceNotFoundException.class, "resource not found error"); + bulkItemResponses.remove(2); + assertFirstException(bulkItemResponses.values(), IllegalArgumentException.class, "illegal argument error"); + bulkItemResponses.remove(3); + assertFirstException(bulkItemResponses.values(), ElasticsearchSecurityException.class, "Authentication required"); + bulkItemResponses.remove(6); + assertFirstException( + bulkItemResponses.values(), + ElasticsearchSecurityException.class, + "current license is non-compliant for [transform]" + ); + bulkItemResponses.remove(7); + + assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); + } + + private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) { + Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses); + assertNotNull(t); + assertEquals(t.getClass(), expectedClass); + assertEquals(t.getMessage(), message); + } +}