Skip to content

Commit

Permalink
Pull feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 16, 2024
1 parent 630637e commit b261085
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
private final MetadataStatsCache<MappingStats> mappingStatsCache;
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
private final RemoteClusterService remoteClusterService;
private final TransportRemoteClusterStatsAction remoteClusterStatsAction;

@Inject
public TransportClusterStatsAction(
Expand All @@ -116,7 +118,8 @@ public TransportClusterStatsAction(
RepositoriesService repositoriesService,
UsageService usageService,
ActionFilters actionFilters,
Settings settings
Settings settings,
TransportRemoteClusterStatsAction remoteClusterStatsAction
) {
super(
TYPE.name(),
Expand All @@ -135,6 +138,7 @@ public TransportClusterStatsAction(
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
this.remoteClusterService = transportService.getRemoteClusterService();
this.settings = settings;
this.remoteClusterStatsAction = remoteClusterStatsAction;
}

@Override
Expand Down Expand Up @@ -388,15 +392,14 @@ private class RemoteStatsFanout extends CancellableFanOut<String, RemoteClusterS
private final Executor requestExecutor;
private final Task task;
private final TaskId taskId;
private final Collection<String> remotes;

RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor) {
RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor, Collection<String> remotes) {
this.task = task;
this.request = request;
this.requestExecutor = requestExecutor;
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(responses::clear);
}
this.taskId = new TaskId(clusterService.getNodeName(), task.getId());
this.remotes = remotes;
}

@Override
Expand Down Expand Up @@ -427,6 +430,10 @@ private boolean isCancelled() {
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
}

void start(SubscribableListener<Map<String, RemoteClusterStats>> future) {
super.run(task, remotes.iterator(), future);
}

@Override
protected Map<String, RemoteClusterStats> onCompletion() {
if (isCancelled()) {
Expand All @@ -435,7 +442,7 @@ protected Map<String, RemoteClusterStats> onCompletion() {

Map<String, RemoteClusterStats> remoteClustersStats = new HashMap<>();

for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
for (String clusterAlias : remotes) {
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo();
RemoteClusterStatsResponse response = responses.get(clusterAlias);
Expand Down Expand Up @@ -469,13 +476,10 @@ SubscribableListener<Map<String, RemoteClusterStats>> getStatsFromRemotes(Task t
if (remotes.isEmpty()) {
return SubscribableListener.newSucceeded(Map.of());
}
var remotesFuture = new ListenableFuture<Map<String, RemoteClusterStats>>();
new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).run(
task,
remoteClusterService.getRegisteredRemoteClusterNames().iterator(),
remotesFuture
);
return remotesFuture;
var remotesListener = new SubscribableListener<Map<String, RemoteClusterStats>>();
new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION), remotes)
.start(remotesListener);
return remotesListener;
}

SubscribableListener<Map<String, RemoteClusterStats>> getRemoteClusterStats() {
Expand Down

0 comments on commit b261085

Please sign in to comment.