diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index f2ab0355304b3..7153be543d2c2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -182,6 +182,7 @@ public boolean isSkipUnavailable(String clusterAlias) { if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { return false; } + // TODO: should first check the skipUn status of each cluster if present in clusterInfo (simplifies testing) return skipUnavailablePredicate.test(clusterAlias); } @@ -229,7 +230,7 @@ public Iterator toXContentChunked(ToXContent.Params params b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED)); b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL)); b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED)); - // each clusterinfo defines its own field object name + // each Cluster object defines its own field object name b.xContentObject("details", clusterInfo.values().iterator()); }); } @@ -352,11 +353,7 @@ public Cluster( this.successfulShards = successfulShards; this.skippedShards = skippedShards; this.failedShards = failedShards; - if (failures == null) { - this.failures = List.of(); - } else { - this.failures = failures; - } + this.failures = failures == null ? Collections.emptyList() : failures; this.took = took; } @@ -373,7 +370,7 @@ public Cluster(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure)); } else { - this.failures = List.of(); + this.failures = Collections.emptyList(); } } @@ -475,7 +472,7 @@ public Cluster.Builder setTook(TimeValue took) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String name = clusterAlias; - if (clusterAlias.equals("")) { + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { name = LOCAL_CLUSTER_NAME_REPRESENTATION; } builder.startObject(name); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 77ef5ef597bb5..c8a7a6bcc4e98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) { RESOLVE_ACTION_NAME, new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> { - if (ExceptionsHelper.isRemoteUnavailableException(e) - && remoteClusterService.isSkipUnavailable(cluster)) { - l.onResponse(new LookupResponse(e)); - } else { - l.onFailure(e); - } - }), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH)) + new ActionListenerResponseHandler<>( + lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)), + LookupResponse::new, + threadPool.executor(ThreadPool.Names.SEARCH) + ) ); } @Override public void onFailure(Exception e) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { - lookupListener.onResponse(new LookupResponse(e)); - } else { - lookupListener.onFailure(e); - } + failIfSkipUnavailableFalse(e, cluster, lookupListener); } }); } @@ -331,6 +324,14 @@ public void onFailure(Exception e) { } } + private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener lookupListener) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { + lookupListener.onResponse(new LookupResponse(e)); + } else { + lookupListener.onFailure(e); + } + } + private static class LookupRequest extends TransportRequest { private final String clusterAlias; private final Collection policyNames; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 1e78f454b7531..878c8b1259479 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,16 +7,13 @@ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -25,9 +22,6 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -75,7 +69,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -150,103 +143,15 @@ public void execute( analyzedPlan( parse(request.query(), request.params()), executionInfo, - new LogicalPlanActionListener(request, executionInfo, runPhase, listener) - ); - } - - /** - * ActionListener that receives LogicalPlan or error from logical planning. - * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so - * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error. - */ - class LogicalPlanActionListener implements ActionListener { - private final EsqlQueryRequest request; - private final EsqlExecutionInfo executionInfo; - private final BiConsumer> runPhase; - private final ActionListener listener; - - LogicalPlanActionListener( - EsqlQueryRequest request, - EsqlExecutionInfo executionInfo, - BiConsumer> runPhase, - ActionListener listener - ) { - this.request = request; - this.executionInfo = executionInfo; - this.runPhase = runPhase; - this.listener = listener; - } - - @Override - public void onResponse(LogicalPlan analyzedPlan) { - executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener); - } - - /** - * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. - * - * For cases where field-caps had no indices to search and the remotes were unavailable, we - * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. - * - * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match - * on any of the requested clusters. - */ - private boolean returnSuccessWithEmptyResult(Exception e) { - if (executionInfo.isCrossClusterSearch() == false) { - return false; - } - - if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { - for (String clusterAlias : executionInfo.clusterAliases()) { - if (executionInfo.isSkipUnavailable(clusterAlias) == false - && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { - return false; - } - } - return true; - } - return false; - } - - @Override - public void onFailure(Exception e) { - if (returnSuccessWithEmptyResult(e)) { - executionInfo.markEndQuery(); - Exception exceptionForResponse; - if (e instanceof ConnectTransportException) { - // when field-caps has no field info (since no clusters could be connected to or had matching indices) - // it just throws the first exception in its list, so this odd special handling is here is to avoid - // having one specific remote alias name in all failure lists in the metadata response - exceptionForResponse = new RemoteTransportException( - "connect_transport_exception - unable to connect to remote cluster", - null - ); + ActionListener.wrap(plan -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(plan), listener), e -> { + if (EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)) { + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, e); + listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); } else { - exceptionForResponse = e; - } - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook( - executionInfo.overallTook() - ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); - } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); - // add this exception to the failures list only if there is no failure already recorded there - if (v.getFailures() == null || v.getFailures().size() == 0) { - builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); - } - } - return builder.build(); - }); + listener.onFailure(e); } - listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); - } else { - listener.onFailure(e); - } - } + }) + ); } /** @@ -261,7 +166,7 @@ public void executeOptimizedPlan( ActionListener listener ) { LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); - updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); if (firstPhase == null) { runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); } else { @@ -344,15 +249,15 @@ private void preAnalyze( .collect(Collectors.toSet()); Map unavailableClusters = enrichResolution.getUnavailableClusters(); preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index + // TODO in follow-PR (for skip_unavailable handling of missing indexes) add some tests for invalid index // resolution to updateExecutionInfo if (indexResolution.isValid()) { - updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel - // Exception to let the LogicalPlanActionListener decide how to proceed + // Exception to let the CcsAwarePreAnalysisActionListener decide how to proceed ll.onFailure(new NoClustersToSearchException()); return; } @@ -422,14 +327,14 @@ private void preAnalyzeIndices( } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); - } else { - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); + return; } + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); } else { try { // occurs when dealing with local relations (row a = 1) @@ -440,30 +345,6 @@ private void preAnalyzeIndices( } } - // visible for testing - static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { - StringBuilder sb = new StringBuilder(); - for (String clusterAlias : executionInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { - if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); - } else { - String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); - for (String index : indexExpression.split(",")) { - sb.append(clusterAlias).append(':').append(index).append(','); - } - } - } - } - - if (sb.length() > 0) { - return sb.substring(0, sb.length() - 1); - } else { - return ""; - } - } - static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" @@ -607,86 +488,4 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } - - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { - for (Map.Entry entry : unavailable.entrySet()) { - String clusterAlias = entry.getKey(); - boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); - RemoteTransportException e = new RemoteTransportException( - Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable), - entry.getValue().getException() - ); - if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); - } else { - throw e; - } - } - } - - // visible for testing - static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - Set clustersWithResolvedIndices = new HashSet<>(); - // determine missing clusters - for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { - clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); - } - Set clustersRequested = executionInfo.clusterAliases(); - Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); - clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet()); - /* - * These are clusters in the original request that are not present in the field-caps response. They were - * specified with an index or indices that do not exist, so the search on that cluster is done. - * Mark it as SKIPPED with 0 shards searched and took=0. - */ - for (String c : clustersWithNoMatchingIndices) { - // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if - // they were requested with one or more concrete indices - // for now we never mark the local cluster as SKIPPED - final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c) - ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL - : EsqlExecutionInfo.Cluster.Status.SKIPPED; - executionInfo.swapCluster( - c, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - - // visible for testing - static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { - // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible - if (execInfo.isCrossClusterSearch()) { - execInfo.markEndPlanning(); - for (String clusterAlias : execInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); - if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - } - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java new file mode 100644 index 0000000000000..cd2a99fc3c509 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.index.IndexResolution; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EsqlSessionCCSUtils { + + private EsqlSessionCCSUtils() {} + + static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { + StringBuilder sb = new StringBuilder(); + for (String clusterAlias : executionInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); + } else { + String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); + for (String index : indexExpression.split(",")) { + sb.append(clusterAlias).append(':').append(index).append(','); + } + } + } + } + + if (sb.length() > 0) { + return sb.substring(0, sb.length() - 1); + } else { + return ""; + } + } + + static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { + for (Map.Entry entry : unavailable.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); + RemoteTransportException e = new RemoteTransportException( + Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable), + entry.getValue().getException() + ); + if (skipUnavailable) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .setFailures(List.of(new ShardSearchFailure(e))) + .build() + ); + } else { + throw e; + } + } + } + + static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + Set clustersWithResolvedIndices = new HashSet<>(); + // determine missing clusters + for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { + clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); + } + Set clustersRequested = executionInfo.clusterAliases(); + Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); + clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet()); + /* + * These are clusters in the original request that are not present in the field-caps response. They were + * specified with an index or indices that do not exist, so the search on that cluster is done. + * Mark it as SKIPPED with 0 shards searched and took=0. + */ + for (String c : clustersWithNoMatchingIndices) { + // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if + // they were requested with one or more concrete indices + // for now we never mark the local cluster as SKIPPED + final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c) + ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL + : EsqlExecutionInfo.Cluster.Status.SKIPPED; + executionInfo.swapCluster( + c, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) + .setTook(new TimeValue(0)) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + + static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { + // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible + if (execInfo.isCrossClusterSearch()) { + execInfo.markEndPlanning(); + for (String clusterAlias : execInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + } + } + + static Map determineUnavailableRemoteClusters(List failures) { + Map unavailableRemotes = new HashMap<>(); + for (FieldCapabilitiesFailure failure : failures) { + if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { + for (String indexExpression : failure.getIndices()) { + if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { + unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); + } + } + } + } + return unavailableRemotes; + } + + /** + * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. + * + * For cases where field-caps had no indices to search and the remotes were unavailable, we + * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. + * + * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match + * on any of the requested clusters. + */ + static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + if (executionInfo.isCrossClusterSearch() == false) { + return false; + } + + if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { + for (String clusterAlias : executionInfo.clusterAliases()) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false + && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + return false; + } + } + return true; + } + return false; + } + + static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + executionInfo.markEndQuery(); + Exception exceptionForResponse; + if (e instanceof ConnectTransportException) { + // when field-caps has no field info (since no clusters could be connected to or had matching indices) + // it just throws the first exception in its list, so this odd special handling is here is to avoid + // having one specific remote alias name in all failure lists in the metadata response + exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null); + } else { + exceptionForResponse = e; + } + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // never mark local cluster as skipped + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + // add this exception to the failures list only if there is no failure already recorded there + if (v.getFailures() == null || v.getFailures().size() == 0) { + builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + } + } + return builder.build(); + }); + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index f76f7798dece8..b05928fbba576 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; @@ -20,7 +19,6 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DateEsField; @@ -159,25 +157,12 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); } - Map unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()); + Map unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters( + fieldCapsResponse.getFailures() + ); return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes); } - // visible for testing - static Map determineUnavailableRemoteClusters(List failures) { - Map unavailableRemotes = new HashMap<>(); - for (FieldCapabilitiesFailure failure : failures) { - if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { - for (String indexExpression : failure.getIndices()) { - if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { - unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); - } - } - } - } - return unavailableRemotes; - } - private boolean allNested(List caps) { for (IndexFieldCapabilities cap : caps) { if (false == cap.type().equalsIgnoreCase("nested")) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java similarity index 66% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index dddfa67338419..5c89f612f5603 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -7,11 +7,15 @@ package org.elasticsearch.xpack.esql.session; +import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; +import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -20,18 +24,20 @@ import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.type.EsFieldTests; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class EsqlSessionTests extends ESTestCase { +public class EsqlSessionCCSUtilsTests extends ESTestCase { public void testCreateIndexExpressionFromAvailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; @@ -45,7 +51,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(5)); assertThat( @@ -69,7 +75,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(3)); assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); @@ -93,7 +99,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); } // only remotes present and all marked as skipped, so in revised index expression should be empty string @@ -113,7 +119,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); } } @@ -131,7 +137,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -159,7 +165,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -176,7 +182,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -224,7 +230,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -262,7 +268,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -298,7 +304,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -336,7 +342,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); } } @@ -358,7 +364,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { Thread.sleep(1); } catch (InterruptedException e) {} - EsqlSession.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); assertNull(executionInfo.overallTook()); @@ -410,4 +416,167 @@ private static Map randomMapping() { } return result; } + + public void testDetermineUnavailableRemoteClusters() { + // two clusters, both "remote unavailable" type exceptions + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote1:foo", "remote1:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); + } + + // one cluster with "remote unavailable" with two failures + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // two clusters, one "remote unavailable" type exceptions and one with another type + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote2:foo", "remote2:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // one cluster1 with exception not known to indicate "remote unavailable" + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); + } + + // empty failures list + { + List failures = new ArrayList<>(); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); + } + } + + public void testReturnSuccessWithEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + NoClustersToSearchException noClustersException = new NoClustersToSearchException(); + Predicate skipUnPredicate = s -> { + if (s.equals("remote2") || s.equals("remote3")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + // not a cross-cluster cluster search, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // local cluster is present, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + // TODO: this logic will be added in the follow-on PR that handles missing indices + // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, one cluster is skip_unavailable=false, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, all clusters are skip_unavailable=true, so should return empty result with + // NoSuchClustersException or "remote unavailable" type exception + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + Exception e = randomFrom( + new NoSuchRemoteClusterException("foo"), + noClustersException, + new NoSeedNodeLeftException("foo"), + new IllegalStateException("unknown host") + ); + assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); + } + + // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false + // Note: this functionality may change in follow-on PRs, so remove this test in that case + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); + } + } + + public void testUpdateExecutionInfoToReturnEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + ConnectTransportException transportEx = new ConnectTransportException(null, "foo"); + Predicate skipUnPredicate = s -> { + if (s.startsWith("remote")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localCluster.getClusterAlias(), (k, v) -> localCluster); + executionInfo.swapCluster(remote1.getClusterAlias(), (k, v) -> remote1); + executionInfo.swapCluster(remote2.getClusterAlias(), (k, v) -> remote2); + executionInfo.swapCluster(remote3.getClusterAlias(), (k, v) -> remote3); + + assertNull(executionInfo.overallTook()); + + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); + + assertNotNull(executionInfo.overallTook()); + assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0)); + + for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) { + assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + List remoteFailures = executionInfo.getCluster(remoteAlias).getFailures(); + assertThat(remoteFailures.size(), equalTo(1)); + assertThat(remoteFailures.get(0).reason(), containsString("unable to connect to remote cluster")); + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java deleted file mode 100644 index d6e410305afaa..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.session; - -import org.apache.lucene.index.CorruptIndexException; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.NoSeedNodeLeftException; -import org.elasticsearch.transport.NoSuchRemoteClusterException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; - -public class IndexResolverTests extends ESTestCase { - - public void testDetermineUnavailableRemoteClusters() { - // two clusters, both "remote unavailable" type exceptions - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote1:foo", "remote1:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); - } - - // one cluster with "remote unavailable" with two failures - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // two clusters, one "remote unavailable" type exceptions and one with another type - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote2:foo", "remote2:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // one cluster1 with exception not known to indicate "remote unavailable" - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - - // empty failures list - { - List failures = new ArrayList<>(); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - } -}