Skip to content

Commit

Permalink
More refactorings.
Browse files Browse the repository at this point in the history
  • Loading branch information
quux00 committed Oct 30, 2024
1 parent fcdea1c commit 3ae7e94
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 653 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -75,7 +73,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -150,22 +147,24 @@ public void execute(
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
new LogicalPlanActionListener(request, executionInfo, runPhase, listener)
new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener)
);
}

/**
* ActionListener that receives LogicalPlan or error from logical planning.
* ActionListener that receives LogicalPlan or error from preAnalysis.
* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error
* based on the skip_unavailable status of the cluster the error came from. (The local cluster
* is always treated like skip_unavailable=false.)
*/
class LogicalPlanActionListener implements ActionListener<LogicalPlan> {
class CcsAwarePreAnalysisActionListener implements ActionListener<LogicalPlan> {
private final EsqlQueryRequest request;
private final EsqlExecutionInfo executionInfo;
private final BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase;
private final ActionListener<Result> listener;

LogicalPlanActionListener(
CcsAwarePreAnalysisActionListener(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
Expand Down Expand Up @@ -344,15 +343,15 @@ private <T> void preAnalyze(
.collect(Collectors.toSet());
Map<String, Exception> unavailableClusters = enrichResolution.getUnavailableClusters();
preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> {
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
// TODO in follow-PR (for skip_unavailable handling of missing indexes) add some tests for invalid index
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
// Exception to let the LogicalPlanActionListener decide how to proceed
// Exception to let the CcsAwarePreAnalysisActionListener decide how to proceed
ll.onFailure(new NoClustersToSearchException());
return;
}
Expand Down Expand Up @@ -426,10 +425,10 @@ private void preAnalyzeIndices(
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
} else {
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener);
return;
}
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener);
} else {
try {
// occurs when dealing with local relations (row a = 1)
Expand All @@ -440,30 +439,6 @@ private void preAnalyzeIndices(
}
}

// visible for testing
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
for (String index : indexExpression.split(",")) {
sb.append(clusterAlias).append(':').append(index).append(',');
}
}
}
}

if (sb.length() > 0) {
return sb.substring(0, sb.length() - 1);
} else {
return "";
}
}

static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields) {
if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
// no explicit columns selection, for example "from employees"
Expand Down Expand Up @@ -607,86 +582,4 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
}

static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
String clusterAlias = entry.getKey();
boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
RemoteTransportException e = new RemoteTransportException(
Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable),
entry.getValue().getException()
);
if (skipUnavailable) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.setFailures(List.of(new ShardSearchFailure(e)))
.build()
);
} else {
throw e;
}
}
}

// visible for testing
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
Set<String> clustersWithResolvedIndices = new HashSet<>();
// determine missing clusters
for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
}
Set<String> clustersRequested = executionInfo.clusterAliases();
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
/*
* These are clusters in the original request that are not present in the field-caps response. They were
* specified with an index or indices that do not exist, so the search on that cluster is done.
* Mark it as SKIPPED with 0 shards searched and took=0.
*/
for (String c : clustersWithNoMatchingIndices) {
// TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
// they were requested with one or more concrete indices
// for now we never mark the local cluster as SKIPPED
final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
: EsqlExecutionInfo.Cluster.Status.SKIPPED;
executionInfo.swapCluster(
c,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
.setTook(new TimeValue(0))
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}

// visible for testing
static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
// TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible
if (execInfo.isCrossClusterSearch()) {
execInfo.markEndPlanning();
for (String clusterAlias : execInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias);
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,6 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
}
}

// static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
// StringBuilder sb = new StringBuilder();
// for (String clusterAlias : executionInfo.clusterAliases()) {
// EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
// if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
// if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
// } else {
// String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
// for (String index : indexExpression.split(",'")) {
// sb.append(clusterAlias).append(':').append(index).append(',');
// }
// }
// }
// }
//
// if (sb.length() > 0) {
// return sb.substring(0, sb.length() - 1);
// } else {
// return "";
// }
// }

static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
String clusterAlias = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
Expand All @@ -20,7 +19,6 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.DateEsField;
Expand Down Expand Up @@ -159,25 +157,12 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
}
Map<String, FieldCapabilitiesFailure> unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures());
Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
fieldCapsResponse.getFailures()
);
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes);
}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
}

private boolean allNested(List<IndexFieldCapabilities> caps) {
for (IndexFieldCapabilities cap : caps) {
if (false == cap.type().equalsIgnoreCase("nested")) {
Expand Down
Loading

0 comments on commit 3ae7e94

Please sign in to comment.