From 3553f68f5ac4beb4977b5a1185350b0b365d2110 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 23 Jan 2020 16:16:27 +0100 Subject: [PATCH] [Transform] Handle permanent bulk indexing errors (#51307) check bulk indexing error for permanent problems and ensure the state goes into failed instead of retry. Corrects the stats API to show the real error and avoids excessive audit logging. fixes #50122 --- .../core/transform/TransformMessages.java | 2 + .../TransformTaskFailedStateIT.java | 14 ++- .../transforms/BulkIndexingException.java | 31 ++++++ .../transforms/ClientTransformIndexer.java | 98 +++++++++++++++---- .../transforms/TransformException.java | 15 +++ .../transforms/TransformIndexer.java | 18 +++- .../transform/transforms/TransformTask.java | 2 + .../utils/ExceptionRootCauseFinder.java | 20 ++++ .../transforms/TransformIndexerTests.java | 20 ++-- 9 files changed, 183 insertions(+), 37 deletions(-) create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 912f7032fe3b0..b5c4c908bab76 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -79,6 +79,8 @@ public class TransformMessages { + "please simplify job or increase heap size on data nodes."; public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR = "Failed to execute script with error: [{0}], stack trace: {1}"; + public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR = + "Failed to index documents into destination index due to permanent error: [{0}]"; public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index 809064df4c039..cd01266ae8efc 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -72,8 +72,11 @@ public void testForceStopFailedTransform() throws Exception { startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getTransformStateAndStats(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " - + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "Failed to index documents into destination index due to permanent error: " + + "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] " + + "failures and at least 1 irrecoverable " + + "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are " + + "incompatible with the transform configuration.;.*"; // Verify we have failed for the expected reason assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); @@ -107,8 +110,11 @@ public void testStartFailedTransform() throws Exception { startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getTransformStateAndStats(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " - + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "Failed to index documents into destination index due to permanent error: " + + "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] " + + "failures and at least 1 irrecoverable " + + "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are " + + "incompatible with the transform configuration.;.*"; // Verify we have failed for the expected reason assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java new file mode 100644 index 0000000000000..364c368c43679 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.ElasticsearchException; + +// Wrapper for indexing failures thrown internally in the transform indexer +class BulkIndexingException extends ElasticsearchException { + private final boolean irrecoverable; + + /** + * Create a BulkIndexingException + * + * @param msg The message + * @param cause The most important cause of the bulk indexing failure + * @param irrecoverable whether this is a permanent or irrecoverable error (controls retry) + * @param args arguments for formating the message + */ + BulkIndexingException(String msg, Throwable cause, boolean irrecoverable, Object... args) { + super(msg, cause, args); + this.irrecoverable = irrecoverable; + } + + public boolean isIrrecoverable() { + return irrecoverable; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c0919d8791105..60cfdaf658024 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -36,8 +37,11 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -154,31 +158,63 @@ protected void doNextBulk(BulkRequest request, ActionListener next ActionListener.wrap(bulkResponse -> { if (bulkResponse.hasFailures()) { int failureCount = 0; + // dedup the failures by the type of the exception, as they most likely have the same cause + Map deduplicatedFailures = new LinkedHashMap<>(); + for (BulkItemResponse item : bulkResponse.getItems()) { if (item.isFailed()) { + deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item); failureCount++; } - // TODO gather information on irrecoverable failures and update isIrrecoverableFailure } - if (auditBulkFailures) { - String failureMessage = bulkResponse.buildFailureMessage(); - logger.debug("[{}] Bulk index failure encountered: {}", getJobId(), failureMessage); - auditor.warning( + + // note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)} + + // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure + // Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments + // the indexing failure counter + // and possibly retries) + Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses( + deduplicatedFailures.values() + ); + if (irrecoverableException == null) { + String failureMessage = getBulkIndexDetailedFailureMessage(" Significant failures: ", deduplicatedFailures); + logger.debug("[{}] Bulk index experienced [{}] failures.{}", getJobId(), failureCount, failureMessage); + + Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause(); + nextPhase.onFailure( + new BulkIndexingException( + "Bulk index experienced [{}] failures. Significant falures: {}", + firstException, + false, + failureCount, + failureMessage + ) + ); + } else { + deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName()); + String failureMessage = getBulkIndexDetailedFailureMessage(" Other failures: ", deduplicatedFailures); + irrecoverableException = decorateBulkIndexException(irrecoverableException); + + logger.debug( + "[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}].{}", getJobId(), - "Experienced at least [" - + failureCount - + "] bulk index failures. See the logs of the node running the transform for details. " - + failureMessage + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ); + + nextPhase.onFailure( + new BulkIndexingException( + "Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}", + irrecoverableException, + true, + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ) ); - auditBulkFailures = false; } - // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure - // It increments the indexing failure, and then calls the `onFailure` logic - nextPhase.onFailure( - new BulkIndexingException( - "Bulk index experienced failures. " + "See the logs of the node running the transform for details." - ) - ); } else { auditBulkFailures = true; nextPhase.onResponse(bulkResponse); @@ -320,11 +356,31 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() { return seqNoPrimaryTermAndIndex.get(); } - // Considered a recoverable indexing failure - private static class BulkIndexingException extends ElasticsearchException { - BulkIndexingException(String msg, Object... args) { - super(msg, args); + private static String getBulkIndexDetailedFailureMessage(String prefix, Map failures) { + if (failures.isEmpty()) { + return ""; + } + + StringBuilder failureMessageBuilder = new StringBuilder(prefix); + for (Entry failure : failures.entrySet()) { + failureMessageBuilder.append("\n[") + .append(failure.getKey()) + .append("] message [") + .append(failure.getValue().getFailureMessage()) + .append("]"); } + String failureMessage = failureMessageBuilder.toString(); + return failureMessage; } + private static Exception decorateBulkIndexException(Exception irrecoverableException) { + if (irrecoverableException instanceof MapperParsingException) { + return new TransformException( + "Destination index mappings are incompatible with the transform configuration.", + irrecoverableException + ); + } + + return irrecoverableException; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java new file mode 100644 index 0000000000000..ec70ce0626660 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.ElasticsearchException; + +class TransformException extends ElasticsearchException { + TransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 0848fc21217e5..cba4e6c179ad0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -480,6 +480,8 @@ synchronized void handleFailure(Exception e) { } else if (unwrappedException instanceof ScriptException) { handleScriptException((ScriptException) unwrappedException); // irrecoverable error without special handling + } else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { + handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException); } else if (unwrappedException instanceof IndexNotFoundException || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException || unwrappedException instanceof TransformConfigReloadingException) { @@ -834,9 +836,21 @@ private void handleScriptException(ScriptException scriptException) { failIndexer(message); } + /** + * Handle permanent bulk indexing exception case. This is error is irrecoverable. + * + * @param bulkIndexingException BulkIndexingException thrown + */ + private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) { + String message = TransformMessages.getMessage( + TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR, + bulkIndexingException.getDetailedMessage() + ); + failIndexer(message); + } + protected void failIndexer(String failureMessage) { - logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage); - auditor.error(getJobId(), failureMessage); + // note: logging and audit is done as part of context.markAsFailed context.markAsFailed(failureMessage); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index d977d6b99f996..66e1c10ef8e63 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -455,6 +455,8 @@ public synchronized void fail(String reason, ActionListener listener) { listener.onResponse(null); return; } + + logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason); auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index f3a7e7cf0f7b9..00e46f0b10e1b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -7,8 +7,12 @@ package org.elasticsearch.xpack.transform.utils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.index.mapper.MapperParsingException; + +import java.util.Collection; /** * Set of static utils to find the cause of a search exception. @@ -53,6 +57,22 @@ public static String getDetailedMessage(Throwable t) { return t.getMessage(); } + /** + * Return the first irrecoverableException from a collection of bulk responses if there are any. + * + * @param failures a collection of bulk item responses + * @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found + */ + public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection failures) { + for (BulkItemResponse failure : failures) { + if (failure.getFailure().getCause() instanceof MapperParsingException) { + return failure.getFailure().getCause(); + } + } + + return null; + } + private ExceptionRootCauseFinder() {} } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 21379dc95bb3f..e6ce52fa5b9ca 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -67,7 +67,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.matchesRegex; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.matches; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -398,7 +400,8 @@ public void testScriptError() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(1); try { MockTransformAuditor auditor = new MockTransformAuditor(); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + TransformContext.Listener contextListener = mock(TransformContext.Listener.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); MockedTransformIndexer indexer = createMockIndexer( config, @@ -412,14 +415,7 @@ public void testScriptError() throws Exception { ); final CountDownLatch latch = indexer.newLatch(1); - auditor.addExpectation( - new MockTransformAuditor.SeenAuditExpectation( - "fail indexer due to script error", - org.elasticsearch.xpack.core.common.notifications.Level.ERROR, - transformId, - "Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]" - ) - ); + indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -428,11 +424,15 @@ public void testScriptError() throws Exception { latch.countDown(); assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); assertTrue(failIndexerCalled.get()); + verify(contextListener, times(1)).fail( + matches("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]"), + any() + ); + assertThat( failureMessage.get(), matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]") ); - auditor.assertAllExpectationsMatched(); } finally { executor.shutdownNow(); }