Skip to content

Commit

Permalink
Harden field-caps request dispatcher (elastic#108736)
Browse files Browse the repository at this point in the history
ExceptionHelper#useAndSuppress can throw exceptions if both input 
exceptions having the same root cause. If this happens, the field-caps
request dispatcher might fail to notify the completion to the caller. I
found this while running ES|QL with disruptions.

Relates elastic#107347
  • Loading branch information
dnhatn authored May 20, 2024
1 parent 60777cf commit 05a2046
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/108736.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 108736
summary: Harden field-caps request dispatcher
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ final class RequestDispatcher {
this.onComplete = new RunOnce(onComplete);
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
for (String index : indices) {
final GroupShardsIterator<ShardIterator> shardIts = clusterService.operationRouting()
.searchShards(clusterState, new String[] { index }, null, null, null, null);
final GroupShardsIterator<ShardIterator> shardIts;
try {
shardIts = clusterService.operationRouting().searchShards(clusterState, new String[] { index }, null, null, null, null);
} catch (Exception e) {
onIndexFailure.accept(index, e);
continue;
}
final IndexSelector indexResult = new IndexSelector(shardIts);
if (indexResult.nodeToShards.isEmpty()) {
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
Expand Down Expand Up @@ -168,7 +173,7 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
assert node != null;
LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds);
final ActionListener<FieldCapabilitiesNodeResponse> listener = ActionListener.wrap(
r -> onRequestResponse(shardIds, r),
this::onRequestResponse,
failure -> onRequestFailure(shardIds, failure)
);
final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest(
Expand All @@ -188,7 +193,11 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
nodeRequest,
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new, executor)
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, () -> afterRequestsCompleted(shardIds.size())),
FieldCapabilitiesNodeResponse::new,
executor
)
);
}

Expand All @@ -201,7 +210,7 @@ private void afterRequestsCompleted(int numRequests) {
}
}

private void onRequestResponse(List<ShardId> shardIds, FieldCapabilitiesNodeResponse nodeResponse) {
private void onRequestResponse(FieldCapabilitiesNodeResponse nodeResponse) {
for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) {
if (indexResponse.canMatch()) {
if (fieldCapsRequest.includeEmptyFields() == false) {
Expand All @@ -224,7 +233,6 @@ private void onRequestResponse(List<ShardId> shardIds, FieldCapabilitiesNodeResp
indexSelector.setFailure(e.getKey(), e.getValue());
}
}
afterRequestsCompleted(shardIds.size());
}

private void onRequestFailure(List<ShardId> shardIds, Exception e) {
Expand All @@ -234,7 +242,6 @@ private void onRequestFailure(List<ShardId> shardIds, Exception e) {
indexSelector.setFailure(shardId, e);
}
}
afterRequestsCompleted(shardIds.size());
}

private static class IndexSelector {
Expand All @@ -253,14 +260,23 @@ private static class IndexSelector {
synchronized Exception getFailure() {
Exception first = null;
for (Exception e : failures.values()) {
first = ExceptionsHelper.useOrSuppress(first, e);
first = useOrSuppressIfDifferent(first, e);
}
return first;
}

static Exception useOrSuppressIfDifferent(Exception first, Exception second) {
if (first == null) {
return second;
} else if (ExceptionsHelper.unwrap(first) != ExceptionsHelper.unwrap(second)) {
first.addSuppressed(second);
}
return first;
}

synchronized void setFailure(ShardId shardId, Exception failure) {
assert unmatchedShardIds.contains(shardId) == false : "Shard " + shardId + " was unmatched already";
failures.compute(shardId, (k, curr) -> ExceptionsHelper.useOrSuppress(curr, failure));
failures.compute(shardId, (k, curr) -> useOrSuppressIfDifferent(curr, failure));
}

synchronized void addUnmatchedShardId(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -599,6 +600,63 @@ public <T extends TransportResponse> void sendRequest(
}
}

public void testFailWithSameException() throws Exception {
final List<String> allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList();
final ClusterState clusterState;
{
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 10);
for (int i = 0; i < numNodes; i++) {
discoNodes.add(newNode("node_" + i, VersionUtils.randomVersion(random()), IndexVersionUtils.randomVersion()));
}
Metadata.Builder metadata = Metadata.builder();
for (String index : allIndices) {
metadata.put(
IndexMetadata.builder(index).settings(indexSettings(IndexVersions.MINIMUM_COMPATIBLE, between(1, 10), between(0, 3)))
);
}
clusterState = newClusterState(metadata.build(), discoNodes.build());
}
try (TestTransportService transportService = TestTransportService.newTestTransportService()) {
final List<String> targetIndices = randomSubsetOf(between(1, allIndices.size()), allIndices);
final ResponseCollector responseCollector = new ResponseCollector();
boolean withFilter = randomBoolean();
final RequestDispatcher dispatcher = new RequestDispatcher(
mockClusterService(clusterState),
transportService,
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
randomNonNegativeLong(),
targetIndices.toArray(new String[0]),
transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
responseCollector::addIndexResponse,
responseCollector::addIndexFailure,
responseCollector::onComplete
);
final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter);
transportService.requestTracker.set(requestTracker);

RuntimeException ex = new RuntimeException("shared");
transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
Exception failure = randomFrom(ex, new RuntimeException("second"), new IllegalStateException("third"));
handler.executor().execute(() -> handler.handleException(new TransportException(failure)));
}
});
dispatcher.execute();
responseCollector.awaitCompletion();
assertThat(responseCollector.failures.keySet(), equalTo(Sets.newHashSet(targetIndices)));
}
}

private static class NodeRequest {
final int round;
final DiscoveryNode node;
Expand Down

0 comments on commit 05a2046

Please sign in to comment.