diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 912f7032fe3b0..b5c4c908bab76 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -79,6 +79,8 @@ public class TransformMessages { + "please simplify job or increase heap size on data nodes."; public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR = "Failed to execute script with error: [{0}], stack trace: {1}"; + public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR = + "Failed to index documents into destination index due to permanent error: [{0}]"; public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java new file mode 100644 index 0000000000000..364c368c43679 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/BulkIndexingException.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.ElasticsearchException; + +// Wrapper for indexing failures thrown internally in the transform indexer +class BulkIndexingException extends ElasticsearchException { + private final boolean irrecoverable; + + /** + * Create a BulkIndexingException + * + * @param msg The message + * @param cause The most important cause of the bulk indexing failure + * @param irrecoverable whether this is a permanent or irrecoverable error (controls retry) + * @param args arguments for formating the message + */ + BulkIndexingException(String msg, Throwable cause, boolean irrecoverable, Object... args) { + super(msg, cause, args); + this.irrecoverable = irrecoverable; + } + + public boolean isIrrecoverable() { + return irrecoverable; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c0919d8791105..dd6906011d427 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -36,8 +37,11 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -154,31 +158,63 @@ protected void doNextBulk(BulkRequest request, ActionListener next ActionListener.wrap(bulkResponse -> { if (bulkResponse.hasFailures()) { int failureCount = 0; + // dedup the failures by the type of the exception, as they most likely have the same cause + Map deduplicatedFailures = new LinkedHashMap<>(); + for (BulkItemResponse item : bulkResponse.getItems()) { if (item.isFailed()) { + deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item); failureCount++; } - // TODO gather information on irrecoverable failures and update isIrrecoverableFailure } - if (auditBulkFailures) { - String failureMessage = bulkResponse.buildFailureMessage(); - logger.debug("[{}] Bulk index failure encountered: {}", getJobId(), failureMessage); - auditor.warning( + + // note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)} + + // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure + // Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments + // the indexing failure counter + // and possibly retries) + Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses( + deduplicatedFailures.values() + ); + if (irrecoverableException == null) { + String failureMessage = getBulkIndexDetailedFailureMessage(deduplicatedFailures); + logger.debug("[{}] Bulk index experienced [{}] failures. Significant falures: {}", getJobId(), failureMessage); + + Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause(); + nextPhase.onFailure( + new BulkIndexingException( + "Bulk index experienced [{}] failures. Significant falures: {}", + firstException, + false, + failureCount, + failureMessage + ) + ); + } else { + deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName()); + String failureMessage = getBulkIndexDetailedFailureMessage(deduplicatedFailures); + irrecoverableException = decorateBulkIndexException(irrecoverableException); + + logger.debug( + "[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}", getJobId(), - "Experienced at least [" - + failureCount - + "] bulk index failures. See the logs of the node running the transform for details. " - + failureMessage + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ); + + nextPhase.onFailure( + new BulkIndexingException( + "Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}", + irrecoverableException, + true, + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ) ); - auditBulkFailures = false; } - // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure - // It increments the indexing failure, and then calls the `onFailure` logic - nextPhase.onFailure( - new BulkIndexingException( - "Bulk index experienced failures. " + "See the logs of the node running the transform for details." - ) - ); } else { auditBulkFailures = true; nextPhase.onResponse(bulkResponse); @@ -320,11 +356,27 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() { return seqNoPrimaryTermAndIndex.get(); } - // Considered a recoverable indexing failure - private static class BulkIndexingException extends ElasticsearchException { - BulkIndexingException(String msg, Object... args) { - super(msg, args); + private static String getBulkIndexDetailedFailureMessage(Map failures) { + StringBuilder failureMessageBuilder = new StringBuilder(); + for (Entry failure : failures.entrySet()) { + failureMessageBuilder.append("\n[") + .append(failure.getKey()) + .append("] message [") + .append(failure.getValue().getFailureMessage()) + .append("]"); } + String failureMessage = failureMessageBuilder.toString(); + return failureMessage; } + private static Exception decorateBulkIndexException(Exception irrecoverableException) { + if (irrecoverableException instanceof MapperParsingException) { + return new TransformException( + "Destination index mappings are incompatible with the transform configuration.", + irrecoverableException + ); + } + + return irrecoverableException; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java new file mode 100644 index 0000000000000..ec70ce0626660 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformException.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.ElasticsearchException; + +class TransformException extends ElasticsearchException { + TransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 0848fc21217e5..e361ab079f896 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -480,6 +480,8 @@ synchronized void handleFailure(Exception e) { } else if (unwrappedException instanceof ScriptException) { handleScriptException((ScriptException) unwrappedException); // irrecoverable error without special handling + } else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { + handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException); } else if (unwrappedException instanceof IndexNotFoundException || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException || unwrappedException instanceof TransformConfigReloadingException) { @@ -834,9 +836,21 @@ private void handleScriptException(ScriptException scriptException) { failIndexer(message); } + /** + * Handle script exception case. This is error is irrecoverable. + * + * @param bulkIndexingException ScriptException thrown + */ + private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) { + String message = TransformMessages.getMessage( + TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR, + bulkIndexingException.getDetailedMessage() + ); + failIndexer(message); + } + protected void failIndexer(String failureMessage) { - logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage); - auditor.error(getJobId(), failureMessage); + // note: logging and audit is done as part of context.markAsFailed context.markAsFailed(failureMessage); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index d977d6b99f996..66e1c10ef8e63 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -455,6 +455,8 @@ public synchronized void fail(String reason, ActionListener listener) { listener.onResponse(null); return; } + + logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason); auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index f3a7e7cf0f7b9..00e46f0b10e1b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -7,8 +7,12 @@ package org.elasticsearch.xpack.transform.utils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.index.mapper.MapperParsingException; + +import java.util.Collection; /** * Set of static utils to find the cause of a search exception. @@ -53,6 +57,22 @@ public static String getDetailedMessage(Throwable t) { return t.getMessage(); } + /** + * Return the first irrecoverableException from a collection of bulk responses if there are any. + * + * @param failures a collection of bulk item responses + * @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found + */ + public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection failures) { + for (BulkItemResponse failure : failures) { + if (failure.getFailure().getCause() instanceof MapperParsingException) { + return failure.getFailure().getCause(); + } + } + + return null; + } + private ExceptionRootCauseFinder() {} }