Skip to content

Commit

Permalink
[Transform] improve irrecoverable error detection - part 2 (#52003)
Browse files Browse the repository at this point in the history
base error handling on rest status instead of listing individual exception types

relates to #51820
  • Loading branch information
Hendrik Muhs committed Feb 12, 2020
1 parent 3f151d1 commit 5d35eaa
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.ScriptException;
Expand All @@ -42,7 +41,6 @@
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

Expand Down Expand Up @@ -287,7 +285,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
// If the transform config index or the transform config is gone, something serious occurred
// We are in an unknown state and should fail out
if (failure instanceof ResourceNotFoundException) {
updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure));
updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure));
} else {
auditor.warning(getJobId(), msg);
updateConfigListener.onResponse(null);
Expand Down Expand Up @@ -477,37 +475,54 @@ synchronized void handleFailure(Exception e) {

if (unwrappedException instanceof CircuitBreakingException) {
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
} else if (unwrappedException instanceof ScriptException) {
return;
}

if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
return;
}

if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException
|| unwrappedException instanceof ResourceNotFoundException
|| unwrappedException instanceof IllegalArgumentException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
return;
}

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = message;
}
// irrecoverable error without special handling
if (unwrappedException instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException;
if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage());
return;
}
}

if (unwrappedException instanceof IllegalArgumentException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
return;
}

if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
return;
}

// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = message;
}
}

/**
Expand Down Expand Up @@ -901,8 +916,12 @@ private RunState determineRunStateAtStart() {
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
}

static class TransformConfigReloadingException extends ElasticsearchException {
TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
/**
* Thrown when the transform configuration disappeared permanently.
* (not if reloading failed due to an intermittent problem)
*/
static class TransformConfigLostOnReloadException extends ResourceNotFoundException {
TransformConfigLostOnReloadException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,37 @@
package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.rest.RestStatus;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

/**
* Set of static utils to find the cause of a search exception.
*/
public final class ExceptionRootCauseFinder {

/**
* List of rest statuses that we consider irrecoverable
*/
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = new HashSet<>(
Arrays.asList(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
)
);

/**
* Unwrap the exception stack and return the most likely cause.
*
Expand Down Expand Up @@ -61,17 +79,22 @@ public static String getDetailedMessage(Throwable t) {
/**
* Return the first irrecoverableException from a collection of bulk responses if there are any.
*
* @param failures a collection of bulk item responses
* @param failures a collection of bulk item responses with failures
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
if (unwrappedThrowable instanceof MapperParsingException
|| unwrappedThrowable instanceof IllegalArgumentException
|| unwrappedThrowable instanceof ResourceNotFoundException) {
if (unwrappedThrowable instanceof IllegalArgumentException) {
return unwrappedThrowable;
}

if (unwrappedThrowable instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedThrowable;
if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
return elasticsearchException;
}
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class ExceptionRootCauseFinderTests extends ESTestCase {
public void testFetFirstIrrecoverableExceptionFromBulkResponses() {
Map<Integer, BulkItemResponse> bulkItemResponses = new HashMap<>();

int id = 1;
// 1
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure("the_index", "type", "id", new MapperParsingException("mapper parsing error"))
)
);
// 2
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure("the_index", "type", "id", new ResourceNotFoundException("resource not found error"))
)
);
// 3
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure("the_index", "type", "id", new IllegalArgumentException("illegal argument error"))
)
);
// 4 not irrecoverable
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure("the_index", "type", "id", new EsRejectedExecutionException("es rejected execution"))
)
);
// 5 not irrecoverable
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure(
"the_index",
"type",
"id",
new TranslogException(new ShardId("the_index", "uid", 0), "translog error")
)
)
);
// 6
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure(
"the_index",
"type",
"id",
new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED)
)
)
);
// 7
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure(
"the_index",
"type",
"id",
new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN)
)
)
);
// 8 not irrecoverable
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure(
"the_index",
"type",
"id",
new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS)
)
)
);
// 9 not irrecoverable
bulkItemResponses.put(
id,
new BulkItemResponse(
id++,
OpType.INDEX,
new BulkItemResponse.Failure(
"the_index",
"type",
"id",
new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR)
)
)
);

assertFirstException(bulkItemResponses.values(), MapperParsingException.class, "mapper parsing error");
bulkItemResponses.remove(1);
assertFirstException(bulkItemResponses.values(), ResourceNotFoundException.class, "resource not found error");
bulkItemResponses.remove(2);
assertFirstException(bulkItemResponses.values(), IllegalArgumentException.class, "illegal argument error");
bulkItemResponses.remove(3);
assertFirstException(bulkItemResponses.values(), ElasticsearchSecurityException.class, "Authentication required");
bulkItemResponses.remove(6);
assertFirstException(
bulkItemResponses.values(),
ElasticsearchSecurityException.class,
"current license is non-compliant for [transform]"
);
bulkItemResponses.remove(7);

assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values()));
}

private static void assertFirstException(Collection<BulkItemResponse> bulkItemResponses, Class<?> expectedClass, String message) {
Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses);
assertNotNull(t);
assertEquals(t.getClass(), expectedClass);
assertEquals(t.getMessage(), message);
}
}

0 comments on commit 5d35eaa

Please sign in to comment.