From b2610852574d62620139f5e9b0420794b43185cb Mon Sep 17 00:00:00 2001 From: Stas Malyshev Date: Mon, 16 Sep 2024 09:07:26 -0600 Subject: [PATCH] Pull feedback --- .../elasticsearch/action/ActionModule.java | 1 - .../stats/TransportClusterStatsAction.java | 30 +++++++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 163e7d6ac4865..b4b26f742681b 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -642,7 +642,6 @@ public void reg actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class); actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class); actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class); - actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index ea8be27fac128..8541a65e17ecd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -64,6 +64,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; private final RemoteClusterService remoteClusterService; + private final TransportRemoteClusterStatsAction remoteClusterStatsAction; @Inject public TransportClusterStatsAction( @@ -116,7 +118,8 @@ public TransportClusterStatsAction( RepositoriesService repositoriesService, UsageService usageService, ActionFilters actionFilters, - Settings settings + Settings settings, + TransportRemoteClusterStatsAction remoteClusterStatsAction ) { super( TYPE.name(), @@ -135,6 +138,7 @@ public TransportClusterStatsAction( this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); this.remoteClusterService = transportService.getRemoteClusterService(); this.settings = settings; + this.remoteClusterStatsAction = remoteClusterStatsAction; } @Override @@ -388,15 +392,14 @@ private class RemoteStatsFanout extends CancellableFanOut remotes; - RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor) { + RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor, Collection remotes) { this.task = task; this.request = request; this.requestExecutor = requestExecutor; - if (task instanceof CancellableTask cancellableTask) { - cancellableTask.addListener(responses::clear); - } this.taskId = new TaskId(clusterService.getNodeName(), task.getId()); + this.remotes = remotes; } @Override @@ -427,6 +430,10 @@ private boolean isCancelled() { return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled(); } + void start(SubscribableListener> future) { + super.run(task, remotes.iterator(), future); + } + @Override protected Map onCompletion() { if (isCancelled()) { @@ -435,7 +442,7 @@ protected Map onCompletion() { Map remoteClustersStats = new HashMap<>(); - for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) { + for (String clusterAlias : remotes) { RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); RemoteClusterStatsResponse response = responses.get(clusterAlias); @@ -469,13 +476,10 @@ SubscribableListener> getStatsFromRemotes(Task t if (remotes.isEmpty()) { return SubscribableListener.newSucceeded(Map.of()); } - var remotesFuture = new ListenableFuture>(); - new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).run( - task, - remoteClusterService.getRegisteredRemoteClusterNames().iterator(), - remotesFuture - ); - return remotesFuture; + var remotesListener = new SubscribableListener>(); + new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION), remotes) + .start(remotesListener); + return remotesListener; } SubscribableListener> getRemoteClusterStats() {