diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ca2e9001490e3..878c8b1259479 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.ShardSearchFailure; @@ -23,9 +22,6 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -147,105 +143,15 @@ public void execute( analyzedPlan( parse(request.query(), request.params()), executionInfo, - new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener) - ); - } - - /** - * ActionListener that receives LogicalPlan or error from preAnalysis. - * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so - * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error - * based on the skip_unavailable status of the cluster the error came from. (The local cluster - * is always treated like skip_unavailable=false.) - */ - class CcsAwarePreAnalysisActionListener implements ActionListener { - private final EsqlQueryRequest request; - private final EsqlExecutionInfo executionInfo; - private final BiConsumer> runPhase; - private final ActionListener listener; - - CcsAwarePreAnalysisActionListener( - EsqlQueryRequest request, - EsqlExecutionInfo executionInfo, - BiConsumer> runPhase, - ActionListener listener - ) { - this.request = request; - this.executionInfo = executionInfo; - this.runPhase = runPhase; - this.listener = listener; - } - - @Override - public void onResponse(LogicalPlan analyzedPlan) { - executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener); - } - - /** - * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. - * - * For cases where field-caps had no indices to search and the remotes were unavailable, we - * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. - * - * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match - * on any of the requested clusters. - */ - private boolean returnSuccessWithEmptyResult(Exception e) { - if (executionInfo.isCrossClusterSearch() == false) { - return false; - } - - if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { - for (String clusterAlias : executionInfo.clusterAliases()) { - if (executionInfo.isSkipUnavailable(clusterAlias) == false - && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { - return false; - } - } - return true; - } - return false; - } - - @Override - public void onFailure(Exception e) { - if (returnSuccessWithEmptyResult(e)) { - executionInfo.markEndQuery(); - Exception exceptionForResponse; - if (e instanceof ConnectTransportException) { - // when field-caps has no field info (since no clusters could be connected to or had matching indices) - // it just throws the first exception in its list, so this odd special handling is here is to avoid - // having one specific remote alias name in all failure lists in the metadata response - exceptionForResponse = new RemoteTransportException( - "connect_transport_exception - unable to connect to remote cluster", - null - ); + ActionListener.wrap(plan -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(plan), listener), e -> { + if (EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)) { + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, e); + listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); } else { - exceptionForResponse = e; + listener.onFailure(e); } - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook( - executionInfo.overallTook() - ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); - } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); - // add this exception to the failures list only if there is no failure already recorded there - if (v.getFailures() == null || v.getFailures().size() == 0) { - builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); - } - } - return builder.build(); - }); - } - listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); - } else { - listener.onFailure(e); - } - } + }) + ); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 50df203adca35..b09519420b063 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -144,4 +145,64 @@ static Map determineUnavailableRemoteClusters( } return unavailableRemotes; } + + /** + * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. + * + * For cases where field-caps had no indices to search and the remotes were unavailable, we + * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. + * + * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match + * on any of the requested clusters. + */ + static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + if (executionInfo.isCrossClusterSearch() == false) { + return false; + } + + if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { + for (String clusterAlias : executionInfo.clusterAliases()) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false + && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + return false; + } + } + return true; + } + return false; + } + + // TODO: write tests for this and method above + static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + executionInfo.markEndQuery(); + Exception exceptionForResponse; + if (e instanceof ConnectTransportException) { + // when field-caps has no field info (since no clusters could be connected to or had matching indices) + // it just throws the first exception in its list, so this odd special handling is here is to avoid + // having one specific remote alias name in all failure lists in the metadata response + exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null); + } else { + exceptionForResponse = e; + } + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // never mark local cluster as skipped + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + // add this exception to the failures list only if there is no failure already recorded there + if (v.getFailures() == null || v.getFailures().size() == 0) { + builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + } + } + return builder.build(); + }); + } + } }