Skip to content

Commit

Permalink
[Transform] Don't fail a transform due to ILM closing an index (elast…
Browse files Browse the repository at this point in the history
…ic#90396) (elastic#90402)

Transform can fail due to a ClusterBlockException that reports to be non-retryable. This is a special kind of race condition where the initial checks pass, but meanwhile between the check and the action something changes. In the particular case a wildcard index pattern got resolved to concrete index names. One of the indices got closed (ILM) before transform run the search operation. Pragmatically we should handle a cluster block exception as retry-able error.

fixes elastic#89802

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
Hendrik Muhs and elasticmachine authored Sep 27, 2022
1 parent 0e75bf5 commit 51a3006
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/90396.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 90396
summary: Don't fail a transform on a ClusterBlockException, this may be due to ILM closing an index
area: Transform
type: bug
issues:
- 89802
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.xpack.core.transform.TransformMessages;
Expand Down Expand Up @@ -65,6 +66,9 @@ void handleIndexerFailure(Exception e, SettingsConfig settingsConfig) {
handleScriptException(scriptException, unattended);
} else if (unwrappedException instanceof BulkIndexingException bulkIndexingException) {
handleBulkIndexingException(bulkIndexingException, unattended, getNumFailureRetries(settingsConfig));
} else if (unwrappedException instanceof ClusterBlockException clusterBlockException) {
// gh#89802 always retry for a cluster block exception, because a cluster block should be temporary.
retry(clusterBlockException, clusterBlockException.getDetailedMessage(), unattended, getNumFailureRetries(settingsConfig));
} else if (unwrappedException instanceof ElasticsearchException elasticsearchException) {
handleElasticsearchException(elasticsearchException, unattended, getNumFailureRetries(settingsConfig));
} else if (unwrappedException instanceof IllegalArgumentException illegalArgumentException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -20,6 +22,9 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;

import java.util.Map;
import java.util.Set;

import static java.util.Collections.singletonList;

public class TransformFailureHandlerTests extends ESTestCase {
Expand Down Expand Up @@ -113,6 +118,34 @@ public void testUnattended() {
assertNoFailure(handler, new NullPointerException("NPE"), contextListener, settings);
}

public void testClusterBlock() {
String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).build();

MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
MockTransformContextListener contextListener = new MockTransformContextListener();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
context.setPageSize(500);

TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);

final ClusterBlockException clusterBlock = new ClusterBlockException(
Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))
);

handler.handleIndexerFailure(clusterBlock, settings);
assertFalse(contextListener.getFailed());
assertEquals(1, contextListener.getFailureCountChangedCounter());

handler.handleIndexerFailure(clusterBlock, settings);
assertFalse(contextListener.getFailed());
assertEquals(2, contextListener.getFailureCountChangedCounter());

handler.handleIndexerFailure(clusterBlock, settings);
assertTrue(contextListener.getFailed());
assertEquals(3, contextListener.getFailureCountChangedCounter());
}

private void assertNoFailure(
TransformFailureHandler handler,
Exception e,
Expand Down

0 comments on commit 51a3006

Please sign in to comment.