Skip to content

Commit

Permalink
Pull feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 16, 2024
1 parent 630637e commit 212f83e
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -388,15 +389,14 @@ private class RemoteStatsFanout extends CancellableFanOut<String, RemoteClusterS
private final Executor requestExecutor;
private final Task task;
private final TaskId taskId;
private final Collection<String> remotes;

RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor) {
RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor, Collection<String> remotes) {
this.task = task;
this.request = request;
this.requestExecutor = requestExecutor;
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(responses::clear);
}
this.taskId = new TaskId(clusterService.getNodeName(), task.getId());
this.remotes = remotes;
}

@Override
Expand Down Expand Up @@ -427,6 +427,10 @@ private boolean isCancelled() {
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
}

void start(Task task, SubscribableListener<Map<String, RemoteClusterStats>> future) {
super.run(task, remotes.iterator(), future);
}

@Override
protected Map<String, RemoteClusterStats> onCompletion() {
if (isCancelled()) {
Expand All @@ -435,7 +439,7 @@ protected Map<String, RemoteClusterStats> onCompletion() {

Map<String, RemoteClusterStats> remoteClustersStats = new HashMap<>();

for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
for (String clusterAlias : remotes) {
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo();
RemoteClusterStatsResponse response = responses.get(clusterAlias);
Expand Down Expand Up @@ -469,12 +473,9 @@ SubscribableListener<Map<String, RemoteClusterStats>> getStatsFromRemotes(Task t
if (remotes.isEmpty()) {
return SubscribableListener.newSucceeded(Map.of());
}
var remotesFuture = new ListenableFuture<Map<String, RemoteClusterStats>>();
new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).run(
task,
remoteClusterService.getRegisteredRemoteClusterNames().iterator(),
remotesFuture
);
var remotesFuture = new SubscribableListener<Map<String, RemoteClusterStats>>();
new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION), remotes)
.start(task, remotesFuture);
return remotesFuture;
}

Expand Down

0 comments on commit 212f83e

Please sign in to comment.