From 3f30e388fcb85efc4a0eadc30f2030145e883401 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 29 Jul 2024 17:27:37 -0400 Subject: [PATCH] Fix enrich policy runner exception handling on empty segments response (#111290) (#111371) * Fix enrich segment action listener exception logic * Update docs/changelog/111290.yaml Co-authored-by: Elastic Machine --- docs/changelog/111290.yaml | 5 + .../segments/IndicesSegmentResponse.java | 2 +- .../xpack/enrich/EnrichPolicyRunner.java | 104 +++++--- .../xpack/enrich/EnrichPolicyRunnerTests.java | 251 +++++++++++++++++- 4 files changed, 325 insertions(+), 37 deletions(-) create mode 100644 docs/changelog/111290.yaml diff --git a/docs/changelog/111290.yaml b/docs/changelog/111290.yaml new file mode 100644 index 0000000000000..efcb01a4aedf9 --- /dev/null +++ b/docs/changelog/111290.yaml @@ -0,0 +1,5 @@ +pr: 111290 +summary: Fix enrich policy runner exception handling on empty segments response +area: Ingest Node +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java index bd12cfdbc7962..429ebe365bbe1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -36,7 +36,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse { private volatile Map indicesSegments; - IndicesSegmentResponse( + public IndicesSegmentResponse( ShardSegments[] shards, int totalShards, int successfulShards, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 5cb9c0cf9c051..2ff4863a12b6e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -25,11 +25,11 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; -import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.client.internal.OriginSettingClient; @@ -572,48 +572,82 @@ private void refreshEnrichIndex(final String destinationIndexName, final int att protected void ensureSingleSegment(final String destinationIndexName, final int attempt) { enrichOriginClient().admin() .indices() - .segments(new IndicesSegmentsRequest(destinationIndexName), new DelegatingActionListener<>(listener) { - @Override - public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { - IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); - if (indexSegments == null) { + .segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> { + int failedShards = indicesSegmentResponse.getFailedShards(); + if (failedShards > 0) { + // Encountered a problem while querying the segments for the enrich index. Try and surface the problem in the log. + logger.warn( + "Policy [{}]: Encountered [{}] shard level failures while querying the segments for enrich index [{}]. " + + "Turn on DEBUG logging for details.", + policyName, + failedShards, + enrichIndexName + ); + if (logger.isDebugEnabled()) { + DefaultShardOperationFailedException[] shardFailures = indicesSegmentResponse.getShardFailures(); + int failureNumber = 1; + String logPrefix = "Policy [" + policyName + "]: Encountered shard failure ["; + String logSuffix = " of " + + shardFailures.length + + "] while querying segments for enrich index [" + + enrichIndexName + + "]. Shard ["; + for (DefaultShardOperationFailedException shardFailure : shardFailures) { + logger.debug( + logPrefix + failureNumber + logSuffix + shardFailure.index() + "][" + shardFailure.shardId() + "]", + shardFailure.getCause() + ); + failureNumber++; + } + } + } + IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); + if (indexSegments == null) { + if (indicesSegmentResponse.getShardFailures().length == 0) { throw new ElasticsearchException( "Could not locate segment information for newly created index [{}]", destinationIndexName ); + } else { + DefaultShardOperationFailedException shardFailure = indicesSegmentResponse.getShardFailures()[0]; + throw new ElasticsearchException( + "Could not obtain segment information for newly created index [{}]; shard info [{}][{}]", + shardFailure.getCause(), + destinationIndexName, + shardFailure.index(), + shardFailure.shardId() + ); } - Map indexShards = indexSegments.getShards(); - assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; - ShardSegments[] shardSegments = indexShards.get(0).shards(); - assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; - ShardSegments primarySegments = shardSegments[0]; - if (primarySegments.getSegments().size() > 1) { - int nextAttempt = attempt + 1; - if (nextAttempt > maxForceMergeAttempts) { - delegate.onFailure( - new ElasticsearchException( - "Force merging index [{}] attempted [{}] times but did not result in one segment.", - destinationIndexName, - attempt, - maxForceMergeAttempts - ) - ); - } else { - logger.debug( - "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", - policyName, - primarySegments.getSegments().size(), - nextAttempt, - maxForceMergeAttempts - ); - forceMergeEnrichIndex(destinationIndexName, nextAttempt); - } + } + Map indexShards = indexSegments.getShards(); + assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; + ShardSegments[] shardSegments = indexShards.get(0).shards(); + assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; + ShardSegments primarySegments = shardSegments[0]; + if (primarySegments.getSegments().size() > 1) { + int nextAttempt = attempt + 1; + if (nextAttempt > maxForceMergeAttempts) { + throw new ElasticsearchException( + "Force merging index [{}] attempted [{}] times but did not result in one segment.", + destinationIndexName, + attempt, + maxForceMergeAttempts + ); } else { - // Force merge down to one segment successful - setIndexReadOnly(destinationIndexName); + logger.debug( + "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", + policyName, + primarySegments.getSegments().size(), + nextAttempt, + maxForceMergeAttempts + ); + forceMergeEnrichIndex(destinationIndexName, nextAttempt); } + } else { + // Force merge down to one segment successful + setIndexReadOnly(destinationIndexName); } - }); + })); } private void setIndexReadOnly(final String destinationIndexName) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 8ce1e7f350ccb..7ba3b356d6015 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; @@ -2048,6 +2049,254 @@ protected void ensureSingleSegment(String destinationIndexName, int attempt) { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerWithEmptySegmentsResponse() throws Exception { + final String sourceIndex = "source-index"; + DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" + { + "field1": "value1", + "field2": 2, + "field3": "ignored", + "field4": "ignored", + "field5": "value5" + }""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + assertResponse( + client().search(new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))), + sourceSearchResponse -> { + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + ); + List enrichFields = List.of("field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> testTaskManager.unregister(task) + ); + + // Wrap the client so that when we receive the indices segments action, we intercept the request and complete it on another thread + // with an empty segments response. + Client client = new FilterClient(client()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (action.equals(IndicesSegmentsAction.INSTANCE)) { + testThreadPool.generic().execute(() -> { + @SuppressWarnings("unchecked") + ActionListener castListener = ((ActionListener) listener); + castListener.onResponse(new IndicesSegmentResponse(new ShardSegments[0], 0, 0, 0, List.of())); + }); + } else { + super.doExecute(action, request, listener); + } + } + }; + + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner( + policyName, + policy, + task, + wrappedListener, + clusterService, + getInstanceFromNode(IndicesService.class), + client, + resolver, + createdEnrichIndex, + randomIntBetween(1, 10000), + randomIntBetween(3, 10) + ); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + if (latch.await(1, TimeUnit.MINUTES) == false) { + fail("Timeout while waiting for runner to complete"); + } + Exception exceptionThrown = exception.get(); + if (exceptionThrown == null) { + fail("Expected exception to be thrown from segment api"); + } + + // Validate exception information + assertThat(exceptionThrown, instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getMessage(), containsString("Could not locate segment information for newly created index")); + } + + public void testRunnerWithShardFailuresInSegmentResponse() throws Exception { + final String sourceIndex = "source-index"; + DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" + { + "field1": "value1", + "field2": 2, + "field3": "ignored", + "field4": "ignored", + "field5": "value5" + }""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + assertResponse( + client().search(new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))), + sourceSearchResponse -> { + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + ); + List enrichFields = List.of("field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> testTaskManager.unregister(task) + ); + + // Wrap the client so that when we receive the indices segments action, we intercept the request and complete it on another thread + // with an failed segments response. + Client client = new FilterClient(client()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (action.equals(IndicesSegmentsAction.INSTANCE)) { + testThreadPool.generic().execute(() -> { + @SuppressWarnings("unchecked") + ActionListener castListener = ((ActionListener) listener); + castListener.onResponse( + new IndicesSegmentResponse( + new ShardSegments[0], + 0, + 0, + 3, + List.of( + new DefaultShardOperationFailedException(createdEnrichIndex, 1, new ElasticsearchException("failure1")), + new DefaultShardOperationFailedException(createdEnrichIndex, 2, new ElasticsearchException("failure2")), + new DefaultShardOperationFailedException(createdEnrichIndex, 3, new ElasticsearchException("failure3")) + ) + ) + ); + }); + } else { + super.doExecute(action, request, listener); + } + } + }; + + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner( + policyName, + policy, + task, + wrappedListener, + clusterService, + getInstanceFromNode(IndicesService.class), + client, + resolver, + createdEnrichIndex, + randomIntBetween(1, 10000), + randomIntBetween(3, 10) + ); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + if (latch.await(1, TimeUnit.MINUTES) == false) { + fail("Timeout while waiting for runner to complete"); + } + Exception exceptionThrown = exception.get(); + if (exceptionThrown == null) { + fail("Expected exception to be thrown from segment api"); + } + + // Validate exception information + assertThat(exceptionThrown, instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getMessage(), containsString("Could not obtain segment information for newly created index")); + assertThat(exceptionThrown.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getCause().getMessage(), containsString("failure1")); + } + public void testRunnerCancel() throws Exception { final String sourceIndex = "source-index"; DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" @@ -2495,7 +2744,7 @@ private ActionListener createTestListener( final CountDownLatch latch, final Consumer exceptionConsumer ) { - return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.info("Run complete"), exceptionConsumer), latch); + return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.debug("Run complete"), exceptionConsumer), latch); } private void validateMappingMetadata(Map mapping, String policyName, EnrichPolicy policy) {