From 212f83e6bcd5d328a385df46cbdb855b935bf30c Mon Sep 17 00:00:00 2001 From: Stas Malyshev Date: Mon, 16 Sep 2024 09:07:26 -0600 Subject: [PATCH] Pull feedback --- .../stats/TransportClusterStatsAction.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index ea8be27fac128..8e568e1d948c0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -64,6 +64,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -388,15 +389,14 @@ private class RemoteStatsFanout extends CancellableFanOut remotes; - RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor) { + RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor, Collection remotes) { this.task = task; this.request = request; this.requestExecutor = requestExecutor; - if (task instanceof CancellableTask cancellableTask) { - cancellableTask.addListener(responses::clear); - } this.taskId = new TaskId(clusterService.getNodeName(), task.getId()); + this.remotes = remotes; } @Override @@ -427,6 +427,10 @@ private boolean isCancelled() { return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled(); } + void start(Task task, SubscribableListener> future) { + super.run(task, remotes.iterator(), future); + } + @Override protected Map onCompletion() { if (isCancelled()) { @@ -435,7 +439,7 @@ protected Map onCompletion() { Map remoteClustersStats = new HashMap<>(); - for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) { + for (String clusterAlias : remotes) { RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); RemoteClusterStatsResponse response = responses.get(clusterAlias); @@ -469,12 +473,9 @@ SubscribableListener> getStatsFromRemotes(Task t if (remotes.isEmpty()) { return SubscribableListener.newSucceeded(Map.of()); } - var remotesFuture = new ListenableFuture>(); - new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).run( - task, - remoteClusterService.getRegisteredRemoteClusterNames().iterator(), - remotesFuture - ); + var remotesFuture = new SubscribableListener>(); + new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION), remotes) + .start(task, remotesFuture); return remotesFuture; }