Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden field-caps request dispatcher #108736

Merged
merged 9 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/108736.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 108736
summary: Harden field-caps request dispatcher
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ final class RequestDispatcher {
this.onComplete = new RunOnce(onComplete);
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
for (String index : indices) {
final GroupShardsIterator<ShardIterator> shardIts = clusterService.operationRouting()
.searchShards(clusterState, new String[] { index }, null, null, null, null);
final GroupShardsIterator<ShardIterator> shardIts;
try {
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null, null, null);
} catch (Exception e) {
onIndexFailure.accept(index, e);
continue;
}
final IndexSelector indexResult = new IndexSelector(shardIts);
if (indexResult.nodeToShards.isEmpty()) {
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
Expand Down Expand Up @@ -168,7 +173,7 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
assert node != null;
LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds);
final ActionListener<FieldCapabilitiesNodeResponse> listener = ActionListener.wrap(
r -> onRequestResponse(shardIds, r),
this::onRequestResponse,
failure -> onRequestFailure(shardIds, failure)
);
final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest(
Expand All @@ -188,7 +193,11 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
nodeRequest,
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new, executor)
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, () -> afterRequestsCompleted(shardIds.size())),
FieldCapabilitiesNodeResponse::new,
executor
)
);
}

Expand All @@ -201,7 +210,7 @@ private void afterRequestsCompleted(int numRequests) {
}
}

private void onRequestResponse(List<ShardId> shardIds, FieldCapabilitiesNodeResponse nodeResponse) {
private void onRequestResponse(FieldCapabilitiesNodeResponse nodeResponse) {
for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) {
if (indexResponse.canMatch()) {
if (fieldCapsRequest.includeEmptyFields() == false) {
Expand All @@ -224,7 +233,6 @@ private void onRequestResponse(List<ShardId> shardIds, FieldCapabilitiesNodeResp
indexSelector.setFailure(e.getKey(), e.getValue());
}
}
afterRequestsCompleted(shardIds.size());
}

private void onRequestFailure(List<ShardId> shardIds, Exception e) {
Expand All @@ -234,7 +242,6 @@ private void onRequestFailure(List<ShardId> shardIds, Exception e) {
indexSelector.setFailure(shardId, e);
}
}
afterRequestsCompleted(shardIds.size());
}

private static class IndexSelector {
Expand All @@ -253,14 +260,23 @@ private static class IndexSelector {
synchronized Exception getFailure() {
Exception first = null;
for (Exception e : failures.values()) {
first = ExceptionsHelper.useOrSuppress(first, e);
first = useOrSuppressIfDifferent(first, e);
}
return first;
}

static Exception useOrSuppressIfDifferent(Exception first, Exception second) {
if (first == null) {
return second;
} else if (ExceptionsHelper.unwrap(first) != ExceptionsHelper.unwrap(second)) {
first.addSuppressed(second);
}
return first;
}

synchronized void setFailure(ShardId shardId, Exception failure) {
assert unmatchedShardIds.contains(shardId) == false : "Shard " + shardId + " was unmatched already";
failures.compute(shardId, (k, curr) -> ExceptionsHelper.useOrSuppress(curr, failure));
failures.compute(shardId, (k, curr) -> useOrSuppressIfDifferent(curr, failure));
}

synchronized void addUnmatchedShardId(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -599,6 +600,63 @@ public <T extends TransportResponse> void sendRequest(
}
}

public void testFailWithSameException() throws Exception {
final List<String> allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList();
final ClusterState clusterState;
{
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 10);
for (int i = 0; i < numNodes; i++) {
discoNodes.add(newNode("node_" + i, VersionUtils.randomVersion(random()), IndexVersionUtils.randomVersion()));
}
Metadata.Builder metadata = Metadata.builder();
for (String index : allIndices) {
metadata.put(
IndexMetadata.builder(index).settings(indexSettings(IndexVersions.MINIMUM_COMPATIBLE, between(1, 10), between(0, 3)))
);
}
clusterState = newClusterState(metadata.build(), discoNodes.build());
}
try (TestTransportService transportService = TestTransportService.newTestTransportService()) {
final List<String> targetIndices = randomSubsetOf(between(1, allIndices.size()), allIndices);
final ResponseCollector responseCollector = new ResponseCollector();
boolean withFilter = randomBoolean();
final RequestDispatcher dispatcher = new RequestDispatcher(
mockClusterService(clusterState),
transportService,
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
randomNonNegativeLong(),
targetIndices.toArray(new String[0]),
transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
responseCollector::addIndexResponse,
responseCollector::addIndexFailure,
responseCollector::onComplete
);
final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter);
transportService.requestTracker.set(requestTracker);

RuntimeException ex = new RuntimeException("shared");
transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
Exception failure = randomFrom(ex, new RuntimeException("second"), new IllegalStateException("third"));
handler.executor().execute(() -> handler.handleException(new TransportException(failure)));
}
});
dispatcher.execute();
responseCollector.awaitCompletion();
assertThat(responseCollector.failures.keySet(), equalTo(Sets.newHashSet(targetIndices)));
}
}

private static class NodeRequest {
final int round;
final DiscoveryNode node;
Expand Down