Skip to content

Commit

Permalink
[Transform] Handle permanent bulk indexing errors (#51307)
Browse files Browse the repository at this point in the history
check bulk indexing error for permanent problems and ensure the state goes into failed instead of
retry. Corrects the stats API to show the real error and avoids excessive audit logging.

fixes #50122
  • Loading branch information
Hendrik Muhs committed Jan 23, 2020
1 parent 84664e8 commit 3553f68
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class TransformMessages {
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";
public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR =
"Failed to index documents into destination index due to permanent error: [{0}]";

public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public void testForceStopFailedTransform() throws Exception {
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "Failed to index documents into destination index due to permanent error: "
+ "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] "
+ "failures and at least 1 irrecoverable "
+ "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are "
+ "incompatible with the transform configuration.;.*";
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

Expand Down Expand Up @@ -107,8 +110,11 @@ public void testStartFailedTransform() throws Exception {
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "Failed to index documents into destination index due to permanent error: "
+ "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] "
+ "failures and at least 1 irrecoverable "
+ "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are "
+ "incompatible with the transform configuration.;.*";
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.ElasticsearchException;

// Wrapper for indexing failures thrown internally in the transform indexer
class BulkIndexingException extends ElasticsearchException {
private final boolean irrecoverable;

/**
* Create a BulkIndexingException
*
* @param msg The message
* @param cause The most important cause of the bulk indexing failure
* @param irrecoverable whether this is a permanent or irrecoverable error (controls retry)
* @param args arguments for formating the message
*/
BulkIndexingException(String msg, Throwable cause, boolean irrecoverable, Object... args) {
super(msg, cause, args);
this.irrecoverable = irrecoverable;
}

public boolean isIrrecoverable() {
return irrecoverable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -36,8 +37,11 @@
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -154,31 +158,63 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
ActionListener.wrap(bulkResponse -> {
if (bulkResponse.hasFailures()) {
int failureCount = 0;
// dedup the failures by the type of the exception, as they most likely have the same cause
Map<String, BulkItemResponse> deduplicatedFailures = new LinkedHashMap<>();

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
failureCount++;
}
// TODO gather information on irrecoverable failures and update isIrrecoverableFailure
}
if (auditBulkFailures) {
String failureMessage = bulkResponse.buildFailureMessage();
logger.debug("[{}] Bulk index failure encountered: {}", getJobId(), failureMessage);
auditor.warning(

// note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)}

// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
// the indexing failure counter
// and possibly retries)
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
deduplicatedFailures.values()
);
if (irrecoverableException == null) {
String failureMessage = getBulkIndexDetailedFailureMessage(" Significant failures: ", deduplicatedFailures);
logger.debug("[{}] Bulk index experienced [{}] failures.{}", getJobId(), failureCount, failureMessage);

Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause();
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures. Significant falures: {}",
firstException,
false,
failureCount,
failureMessage
)
);
} else {
deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName());
String failureMessage = getBulkIndexDetailedFailureMessage(" Other failures: ", deduplicatedFailures);
irrecoverableException = decorateBulkIndexException(irrecoverableException);

logger.debug(
"[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}].{}",
getJobId(),
"Experienced at least ["
+ failureCount
+ "] bulk index failures. See the logs of the node running the transform for details. "
+ failureMessage
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
);

nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}",
irrecoverableException,
true,
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
)
);
auditBulkFailures = false;
}
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// It increments the indexing failure, and then calls the `onFailure` logic
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced failures. " + "See the logs of the node running the transform for details."
)
);
} else {
auditBulkFailures = true;
nextPhase.onResponse(bulkResponse);
Expand Down Expand Up @@ -320,11 +356,31 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
return seqNoPrimaryTermAndIndex.get();
}

// Considered a recoverable indexing failure
private static class BulkIndexingException extends ElasticsearchException {
BulkIndexingException(String msg, Object... args) {
super(msg, args);
private static String getBulkIndexDetailedFailureMessage(String prefix, Map<String, BulkItemResponse> failures) {
if (failures.isEmpty()) {
return "";
}

StringBuilder failureMessageBuilder = new StringBuilder(prefix);
for (Entry<String, BulkItemResponse> failure : failures.entrySet()) {
failureMessageBuilder.append("\n[")
.append(failure.getKey())
.append("] message [")
.append(failure.getValue().getFailureMessage())
.append("]");
}
String failureMessage = failureMessageBuilder.toString();
return failureMessage;
}

private static Exception decorateBulkIndexException(Exception irrecoverableException) {
if (irrecoverableException instanceof MapperParsingException) {
return new TransformException(
"Destination index mappings are incompatible with the transform configuration.",
irrecoverableException
);
}

return irrecoverableException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.ElasticsearchException;

class TransformException extends ElasticsearchException {
TransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ synchronized void handleFailure(Exception e) {
} else if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
Expand Down Expand Up @@ -834,9 +836,21 @@ private void handleScriptException(ScriptException scriptException) {
failIndexer(message);
}

/**
* Handle permanent bulk indexing exception case. This is error is irrecoverable.
*
* @param bulkIndexingException BulkIndexingException thrown
*/
private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) {
String message = TransformMessages.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
bulkIndexingException.getDetailedMessage()
);
failIndexer(message);
}

protected void failIndexer(String failureMessage) {
logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage);
auditor.error(getJobId(), failureMessage);
// note: logging and audit is done as part of context.markAsFailed
context.markAsFailed(failureMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ public synchronized void fail(String reason, ActionListener<Void> listener) {
listener.onResponse(null);
return;
}

logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
auditor.error(transform.getId(), reason);
// We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.index.mapper.MapperParsingException;

import java.util.Collection;

/**
* Set of static utils to find the cause of a search exception.
Expand Down Expand Up @@ -53,6 +57,22 @@ public static String getDetailedMessage(Throwable t) {
return t.getMessage();
}

/**
* Return the first irrecoverableException from a collection of bulk responses if there are any.
*
* @param failures a collection of bulk item responses
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
if (failure.getFailure().getCause() instanceof MapperParsingException) {
return failure.getFailure().getCause();
}
}

return null;
}

private ExceptionRootCauseFinder() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.matchesRegex;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -398,7 +400,8 @@ public void testScriptError() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
MockTransformAuditor auditor = new MockTransformAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);

MockedTransformIndexer indexer = createMockIndexer(
config,
Expand All @@ -412,14 +415,7 @@ public void testScriptError() throws Exception {
);

final CountDownLatch latch = indexer.newLatch(1);
auditor.addExpectation(
new MockTransformAuditor.SeenAuditExpectation(
"fail indexer due to script error",
org.elasticsearch.xpack.core.common.notifications.Level.ERROR,
transformId,
"Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]"
)
);

indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
Expand All @@ -428,11 +424,15 @@ public void testScriptError() throws Exception {
latch.countDown();
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
assertTrue(failIndexerCalled.get());
verify(contextListener, times(1)).fail(
matches("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]"),
any()
);

assertThat(
failureMessage.get(),
matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]")
);
auditor.assertAllExpectationsMatched();
} finally {
executor.shutdownNow();
}
Expand Down

0 comments on commit 3553f68

Please sign in to comment.