Skip to content

Commit

Permalink
check bulk indexing error for permanent problems
Browse files Browse the repository at this point in the history
  • Loading branch information
Hendrik Muhs committed Jan 22, 2020
1 parent a6fa577 commit d2b17bf
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class TransformMessages {
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";
public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR =
"Failed to index documents into destination index due to permanent error: [{0}]";

public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.ElasticsearchException;

// Wrapper for indexing failures thrown internally in the transform indexer
class BulkIndexingException extends ElasticsearchException {
private final boolean irrecoverable;

/**
* Create a BulkIndexingException
*
* @param msg The message
* @param cause The most important cause of the bulk indexing failure
* @param irrecoverable whether this is a permanent or irrecoverable error (controls retry)
* @param args arguments for formating the message
*/
BulkIndexingException(String msg, Throwable cause, boolean irrecoverable, Object... args) {
super(msg, cause, args);
this.irrecoverable = irrecoverable;
}

public boolean isIrrecoverable() {
return irrecoverable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -36,8 +37,11 @@
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -154,31 +158,63 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
ActionListener.wrap(bulkResponse -> {
if (bulkResponse.hasFailures()) {
int failureCount = 0;
// dedup the failures by the type of the exception, as they most likely have the same cause
Map<String, BulkItemResponse> deduplicatedFailures = new LinkedHashMap<>();

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
failureCount++;
}
// TODO gather information on irrecoverable failures and update isIrrecoverableFailure
}
if (auditBulkFailures) {
String failureMessage = bulkResponse.buildFailureMessage();
logger.debug("[{}] Bulk index failure encountered: {}", getJobId(), failureMessage);
auditor.warning(

// note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)}

// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
// the indexing failure counter
// and possibly retries)
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
deduplicatedFailures.values()
);
if (irrecoverableException == null) {
String failureMessage = getBulkIndexDetailedFailureMessage(deduplicatedFailures);
logger.debug("[{}] Bulk index experienced [{}] failures. Significant falures: {}", getJobId(), failureMessage);

Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause();
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures. Significant falures: {}",
firstException,
false,
failureCount,
failureMessage
)
);
} else {
deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName());
String failureMessage = getBulkIndexDetailedFailureMessage(deduplicatedFailures);
irrecoverableException = decorateBulkIndexException(irrecoverableException);

logger.debug(
"[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}",
getJobId(),
"Experienced at least ["
+ failureCount
+ "] bulk index failures. See the logs of the node running the transform for details. "
+ failureMessage
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
);

nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}",
irrecoverableException,
true,
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
)
);
auditBulkFailures = false;
}
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// It increments the indexing failure, and then calls the `onFailure` logic
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced failures. " + "See the logs of the node running the transform for details."
)
);
} else {
auditBulkFailures = true;
nextPhase.onResponse(bulkResponse);
Expand Down Expand Up @@ -320,11 +356,27 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
return seqNoPrimaryTermAndIndex.get();
}

// Considered a recoverable indexing failure
private static class BulkIndexingException extends ElasticsearchException {
BulkIndexingException(String msg, Object... args) {
super(msg, args);
private static String getBulkIndexDetailedFailureMessage(Map<String, BulkItemResponse> failures) {
StringBuilder failureMessageBuilder = new StringBuilder();
for (Entry<String, BulkItemResponse> failure : failures.entrySet()) {
failureMessageBuilder.append("\n[")
.append(failure.getKey())
.append("] message [")
.append(failure.getValue().getFailureMessage())
.append("]");
}
String failureMessage = failureMessageBuilder.toString();
return failureMessage;
}

private static Exception decorateBulkIndexException(Exception irrecoverableException) {
if (irrecoverableException instanceof MapperParsingException) {
return new TransformException(
"Destination index mappings are incompatible with the transform configuration.",
irrecoverableException
);
}

return irrecoverableException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.ElasticsearchException;

class TransformException extends ElasticsearchException {
TransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ synchronized void handleFailure(Exception e) {
} else if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
Expand Down Expand Up @@ -834,9 +836,21 @@ private void handleScriptException(ScriptException scriptException) {
failIndexer(message);
}

/**
* Handle script exception case. This is error is irrecoverable.
*
* @param bulkIndexingException ScriptException thrown
*/
private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) {
String message = TransformMessages.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
bulkIndexingException.getDetailedMessage()
);
failIndexer(message);
}

protected void failIndexer(String failureMessage) {
logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage);
auditor.error(getJobId(), failureMessage);
// note: logging and audit is done as part of context.markAsFailed
context.markAsFailed(failureMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ public synchronized void fail(String reason, ActionListener<Void> listener) {
listener.onResponse(null);
return;
}

logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
auditor.error(transform.getId(), reason);
// We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.index.mapper.MapperParsingException;

import java.util.Collection;

/**
* Set of static utils to find the cause of a search exception.
Expand Down Expand Up @@ -53,6 +57,22 @@ public static String getDetailedMessage(Throwable t) {
return t.getMessage();
}

/**
* Return the first irrecoverableException from a collection of bulk responses if there are any.
*
* @param failures a collection of bulk item responses
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
if (failure.getFailure().getCause() instanceof MapperParsingException) {
return failure.getFailure().getCause();
}
}

return null;
}

private ExceptionRootCauseFinder() {}

}

0 comments on commit d2b17bf

Please sign in to comment.