Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ESQL CcsUtils #116121

Merged
merged 4 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
// each clusterinfo defines its own field object name
// each Cluster object defines its own field object name
b.xContentObject("details", clusterInfo.values().iterator());
});
}
Expand Down Expand Up @@ -352,11 +352,7 @@ public Cluster(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
if (failures == null) {
this.failures = List.of();
} else {
this.failures = failures;
}
this.failures = failures == null ? Collections.emptyList() : failures;
this.took = took;
}

Expand All @@ -373,7 +369,7 @@ public Cluster(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
} else {
this.failures = List.of();
this.failures = Collections.emptyList();
}
}

Expand Down Expand Up @@ -475,7 +471,7 @@ public Cluster.Builder setTook(TimeValue took) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
String name = clusterAlias;
if (clusterAlias.equals("")) {
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
name = LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.startObject(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) {
RESOLVE_ACTION_NAME,
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
if (ExceptionsHelper.isRemoteUnavailableException(e)
&& remoteClusterService.isSkipUnavailable(cluster)) {
l.onResponse(new LookupResponse(e));
} else {
l.onFailure(e);
}
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
new ActionListenerResponseHandler<>(
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
failIfSkipUnavailableFalse(e, cluster, lookupListener);
}
});
}
Expand All @@ -331,6 +324,14 @@ public void onFailure(Exception e) {
}
}

private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
}

private static class LookupRequest extends TransportRequest {
private final String clusterAlias;
private final Collection<String> policyNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
new CcsUtils.CssPartialErrorsActionListener(executionInfo, listener) {
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
Expand All @@ -171,7 +171,7 @@ public void executeOptimizedPlan(
) {
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
// execute any potential subplans
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
}
Expand Down Expand Up @@ -308,8 +308,8 @@ private <T> void preAnalyze(
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
Expand Down Expand Up @@ -383,7 +383,7 @@ private void preAnalyzeIndices(
}
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo);
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,30 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

class CcsUtils {
class EsqlSessionCCSUtils {

private CcsUtils() {}
private EsqlSessionCCSUtils() {}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are index expressions already parsed here? E.g. remote/index wildcards, etc.?

Copy link
Contributor Author

@quux00 quux00 Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is called in IndexResolver after calling field-caps. It sifts through the FieldCapsResponse failure list.
This is not new code. I just moved this method out of IndexResolver into the utils method, as Costin had requested in my earlier PR.

unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
}

/**
* ActionListener that receives LogicalPlan or error from logical planning.
Expand All @@ -46,70 +62,73 @@ abstract static class CssPartialErrorsActionListener implements ActionListener<L
this.listener = listener;
}

/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
private boolean returnSuccessWithEmptyResult(Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
@Override
public void onFailure(Exception e) {
if (returnSuccessWithEmptyResult(executionInfo, e)) {
updateExecutionInfoToReturnEmptyResult(executionInfo, e);
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
listener.onFailure(e);
}
}
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
return true;
}
return false;
return true;
}
return false;
}

@Override
public void onFailure(Exception e) {
if (returnSuccessWithEmptyResult(e)) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException(
"connect_transport_exception - unable to connect to remote cluster",
null
);
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null);
} else {
exceptionForResponse = e;
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
exceptionForResponse = e;
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
executionInfo.overallTook()
).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
return builder.build();
});
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
listener.onFailure(e);
}
return builder.build();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
*/
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
Expand All @@ -20,7 +18,6 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.DateEsField;
Expand Down Expand Up @@ -159,23 +156,8 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
}
Map<String, FieldCapabilitiesFailure> unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures());
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes);
}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices);
return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()));
}

private boolean allNested(List<IndexFieldCapabilities> caps) {
Expand Down
Loading