diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 447df09942ca8..e67c406e26929 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -25,6 +25,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; @@ -72,12 +73,14 @@ public class EnrichPolicyResolver { private final IndexResolver indexResolver; private final TransportService transportService; private final ThreadPool threadPool; + private final RemoteClusterService remoteClusterService; public EnrichPolicyResolver(ClusterService clusterService, TransportService transportService, IndexResolver indexResolver) { this.clusterService = clusterService; this.transportService = transportService; this.indexResolver = indexResolver; this.threadPool = transportService.getThreadPool(); + this.remoteClusterService = transportService.getRemoteClusterService(); transportService.registerRequestHandler( RESOLVE_ACTION_NAME, threadPool.executor(ThreadPool.Names.SEARCH), @@ -257,22 +260,21 @@ private void lookupPolicies( // remote clusters if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { - final Transport.Connection connection; - try { - connection = getRemoteConnection(cluster); - } catch (Exception e) { - refs.acquire().onFailure(e); - return; - } - transportService.sendRequest( - connection, - RESOLVE_ACTION_NAME, - new LookupRequest(cluster, remotePolicies), - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - refs.acquire(resp -> lookupResponses.put(cluster, resp)), - LookupResponse::new, - threadPool.executor(ThreadPool.Names.SEARCH) + ActionListener lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp)); + getRemoteConnection( + cluster, + lookupListener.delegateFailureAndWrap( + (delegate, connection) -> transportService.sendRequest( + connection, + RESOLVE_ACTION_NAME, + new LookupRequest(cluster, remotePolicies), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + delegate, + LookupResponse::new, + threadPool.executor(ThreadPool.Names.SEARCH) + ) + ) ) ); } @@ -389,13 +391,16 @@ protected Map availablePolicies() { return metadata == null ? Map.of() : metadata.getPolicies(); } - protected Transport.Connection getRemoteConnection(String cluster) { - return transportService.getRemoteClusterService().getConnection(cluster); + protected void getRemoteConnection(String cluster, ActionListener listener) { + remoteClusterService.maybeEnsureConnectedAndGetConnection( + cluster, + remoteClusterService.isSkipUnavailable(cluster) == false, + listener + ); } public Map> groupIndicesPerCluster(String[] indices) { - return transportService.getRemoteClusterService() - .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices) + return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices) .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices()))); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 05a7486a18068..39170f1a305df 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -446,9 +446,9 @@ EnrichResolution resolvePolicies(Collection clusters, Collection listener) { assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY)); - return transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()); + listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode())); } static ClusterService mockClusterService(Map policies) {