Skip to content

Commit

Permalink
Finalized refactorings from closed PR elastic#115976
Browse files Browse the repository at this point in the history
  • Loading branch information
quux00 committed Oct 31, 2024
1 parent 1c644cc commit 9350626
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
// each clusterinfo defines its own field object name
// each Cluster object defines its own field object name
b.xContentObject("details", clusterInfo.values().iterator());
});
}
Expand Down Expand Up @@ -352,11 +352,7 @@ public Cluster(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
if (failures == null) {
this.failures = List.of();
} else {
this.failures = failures;
}
this.failures = failures == null ? Collections.emptyList() : failures;
this.took = took;
}

Expand All @@ -373,7 +369,7 @@ public Cluster(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
} else {
this.failures = List.of();
this.failures = Collections.emptyList();
}
}

Expand Down Expand Up @@ -475,7 +471,7 @@ public Cluster.Builder setTook(TimeValue took) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
String name = clusterAlias;
if (clusterAlias.equals("")) {
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
name = LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.startObject(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) {
RESOLVE_ACTION_NAME,
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
if (ExceptionsHelper.isRemoteUnavailableException(e)
&& remoteClusterService.isSkipUnavailable(cluster)) {
l.onResponse(new LookupResponse(e));
} else {
l.onFailure(e);
}
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
new ActionListenerResponseHandler<>(
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
failIfSkipUnavailableFalse(e, cluster, lookupListener);
}
});
}
Expand All @@ -331,6 +324,14 @@ public void onFailure(Exception e) {
}
}

private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
}

private static class LookupRequest extends TransportRequest {
private final String clusterAlias;
private final Collection<String> policyNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
new CcsUtils.CssPartialErrorsActionListener(executionInfo, listener) {
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
Expand All @@ -171,7 +171,7 @@ public void executeOptimizedPlan(
) {
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
// execute any potential subplans
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
}
Expand Down Expand Up @@ -308,8 +308,8 @@ private <T> void preAnalyze(
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
Expand Down Expand Up @@ -383,7 +383,7 @@ private void preAnalyzeIndices(
}
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo);
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,30 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

class CcsUtils {
class EsqlSessionCCSUtils {

private CcsUtils() {}
private EsqlSessionCCSUtils() {}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
}

/**
* ActionListener that receives LogicalPlan or error from logical planning.
Expand All @@ -46,70 +62,73 @@ abstract static class CssPartialErrorsActionListener implements ActionListener<L
this.listener = listener;
}

/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
private boolean returnSuccessWithEmptyResult(Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
@Override
public void onFailure(Exception e) {
if (returnSuccessWithEmptyResult(executionInfo, e)) {
updateExecutionInfoToReturnEmptyResult(executionInfo, e);
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
listener.onFailure(e);
}
}
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
if (executionInfo.isCrossClusterSearch() == false) {
return false;
}

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
return false;
}
return true;
}
return false;
return true;
}
return false;
}

@Override
public void onFailure(Exception e) {
if (returnSuccessWithEmptyResult(e)) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException(
"connect_transport_exception - unable to connect to remote cluster",
null
);
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
executionInfo.markEndQuery();
Exception exceptionForResponse;
if (e instanceof ConnectTransportException) {
// when field-caps has no field info (since no clusters could be connected to or had matching indices)
// it just throws the first exception in its list, so this odd special handling is here is to avoid
// having one specific remote alias name in all failure lists in the metadata response
exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null);
} else {
exceptionForResponse = e;
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
exceptionForResponse = e;
}
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
executionInfo.overallTook()
).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// never mark local cluster as skipped
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
return builder.build();
});
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
// add this exception to the failures list only if there is no failure already recorded there
if (v.getFailures() == null || v.getFailures().size() == 0) {
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
}
}
listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
} else {
listener.onFailure(e);
}
return builder.build();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
*/
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
Expand All @@ -20,7 +18,6 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.DateEsField;
Expand Down Expand Up @@ -159,23 +156,8 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
}
Map<String, FieldCapabilitiesFailure> unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures());
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes);
}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices);
return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()));
}

private boolean allNested(List<IndexFieldCapabilities> caps) {
Expand Down
Loading

0 comments on commit 9350626

Please sign in to comment.