Skip to content

Commit

Permalink
Removed CcsAwarePreAnalysisActionListener, pushing functionality to E…
Browse files Browse the repository at this point in the history
…sqlSessionCCSUtils
  • Loading branch information
quux00 committed Oct 30, 2024
1 parent 56ea833 commit b9d6aef
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand All @@ -23,9 +22,6 @@
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
Expand Down Expand Up @@ -147,105 +143,15 @@ public void execute(
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener)
);
}

/**
* ActionListener that receives LogicalPlan or error from preAnalysis.
* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error
* based on the skip_unavailable status of the cluster the error came from. (The local cluster
* is always treated like skip_unavailable=false.)
*/
class CcsAwarePreAnalysisActionListener implements ActionListener<LogicalPlan> {
private final EsqlQueryRequest request;
private final EsqlExecutionInfo executionInfo;
private final BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase;
private final ActionListener<Result> listener;

CcsAwarePreAnalysisActionListener(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
this.request = request;
this.executionInfo = executionInfo;
this.runPhase = runPhase;
this.listener = listener;
}

@Override
public void onResponse(LogicalPlan analyzedPlan) {
executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener);
}

/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
private boolean returnSuccessWithEmptyResult(Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
}
return true;
}
return false;
}

@Override
public void onFailure(Exception e) {
if (returnSuccessWithEmptyResult(e)) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException(
"connect_transport_exception - unable to connect to remote cluster",
null
);
ActionListener.wrap(plan -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(plan), listener), e -> {
if (EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)) {
EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, e);
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
exceptionForResponse = e;
listener.onFailure(e);
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
executionInfo.overallTook()
).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
return builder.build();
});
}
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
listener.onFailure(e);
}
}
})
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand Down Expand Up @@ -144,4 +145,64 @@ static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(
}
return unavailableRemotes;
}

/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
}
return true;
}
return false;
}

// TODO: write tests for this and method above
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null);
} else {
exceptionForResponse = e;
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
return builder.build();
});
}
}
}

0 comments on commit b9d6aef

Please sign in to comment.