diff --git a/docs/changelog/108736.yaml b/docs/changelog/108736.yaml new file mode 100644 index 0000000000000..41e4084021e00 --- /dev/null +++ b/docs/changelog/108736.yaml @@ -0,0 +1,5 @@ +pr: 108736 +summary: Harden field-caps request dispatcher +area: Search +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index 3a6a2eeb08de8..6eec2f56d52f1 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -92,8 +92,13 @@ final class RequestDispatcher { this.onComplete = new RunOnce(onComplete); this.indexSelectors = ConcurrentCollections.newConcurrentMap(); for (String index : indices) { - final GroupShardsIterator shardIts = clusterService.operationRouting() - .searchShards(clusterState, new String[] { index }, null, null, null, null); + final GroupShardsIterator shardIts; + try { + shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null, null, null); + } catch (Exception e) { + onIndexFailure.accept(index, e); + continue; + } final IndexSelector indexResult = new IndexSelector(shardIts); if (indexResult.nodeToShards.isEmpty()) { onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")); @@ -168,7 +173,7 @@ private void sendRequestToNode(String nodeId, List shardIds) { assert node != null; LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds); final ActionListener listener = ActionListener.wrap( - r -> onRequestResponse(shardIds, r), + this::onRequestResponse, failure -> onRequestFailure(shardIds, failure) ); final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest( @@ -188,7 +193,11 @@ private void sendRequestToNode(String nodeId, List shardIds) { nodeRequest, parentTask, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new, executor) + new ActionListenerResponseHandler<>( + ActionListener.runAfter(listener, () -> afterRequestsCompleted(shardIds.size())), + FieldCapabilitiesNodeResponse::new, + executor + ) ); } @@ -201,7 +210,7 @@ private void afterRequestsCompleted(int numRequests) { } } - private void onRequestResponse(List shardIds, FieldCapabilitiesNodeResponse nodeResponse) { + private void onRequestResponse(FieldCapabilitiesNodeResponse nodeResponse) { for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) { if (indexResponse.canMatch()) { if (fieldCapsRequest.includeEmptyFields() == false) { @@ -224,7 +233,6 @@ private void onRequestResponse(List shardIds, FieldCapabilitiesNodeResp indexSelector.setFailure(e.getKey(), e.getValue()); } } - afterRequestsCompleted(shardIds.size()); } private void onRequestFailure(List shardIds, Exception e) { @@ -234,7 +242,6 @@ private void onRequestFailure(List shardIds, Exception e) { indexSelector.setFailure(shardId, e); } } - afterRequestsCompleted(shardIds.size()); } private static class IndexSelector { @@ -253,14 +260,23 @@ private static class IndexSelector { synchronized Exception getFailure() { Exception first = null; for (Exception e : failures.values()) { - first = ExceptionsHelper.useOrSuppress(first, e); + first = useOrSuppressIfDifferent(first, e); + } + return first; + } + + static Exception useOrSuppressIfDifferent(Exception first, Exception second) { + if (first == null) { + return second; + } else if (ExceptionsHelper.unwrap(first) != ExceptionsHelper.unwrap(second)) { + first.addSuppressed(second); } return first; } synchronized void setFailure(ShardId shardId, Exception failure) { assert unmatchedShardIds.contains(shardId) == false : "Shard " + shardId + " was unmatched already"; - failures.compute(shardId, (k, curr) -> ExceptionsHelper.useOrSuppress(curr, failure)); + failures.compute(shardId, (k, curr) -> useOrSuppressIfDifferent(curr, failure)); } synchronized void addUnmatchedShardId(ShardId shardId) { diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java index 75d5d7fb7c55d..f5f35c52044d7 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -599,6 +600,63 @@ public void sendRequest( } } + public void testFailWithSameException() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); + final ClusterState clusterState; + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(1, 10); + for (int i = 0; i < numNodes; i++) { + discoNodes.add(newNode("node_" + i, VersionUtils.randomVersion(random()), IndexVersionUtils.randomVersion())); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + metadata.put( + IndexMetadata.builder(index).settings(indexSettings(IndexVersions.MINIMUM_COMPATIBLE, between(1, 10), between(0, 3))) + ); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List targetIndices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final ResponseCollector responseCollector = new ResponseCollector(); + boolean withFilter = randomBoolean(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + targetIndices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + + RuntimeException ex = new RuntimeException("shared"); + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest( + Transport.Connection connection, + String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler + ) { + Exception failure = randomFrom(ex, new RuntimeException("second"), new IllegalStateException("third")); + handler.executor().execute(() -> handler.handleException(new TransportException(failure))); + } + }); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.failures.keySet(), equalTo(Sets.newHashSet(targetIndices))); + } + } + private static class NodeRequest { final int round; final DiscoveryNode node;