Skip to content

Commit

Permalink
getRemoteClusterClient does not need a ThreadPool (elastic#104557)
Browse files Browse the repository at this point in the history
Following elastic#104536 this argument is now unused.
  • Loading branch information
DaveCTurner authored Jan 22, 2024
1 parent 2aa4b01 commit 2d073f4
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
// forward to remote cluster
String clusterAlias = request.getContextSetup().getClusterAlias();
transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE)
.getRemoteClusterClient(clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE)
.execute(PainlessExecuteAction.INSTANCE, request, listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
var remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, searchCoordinationExecutor);
.getRemoteClusterClient(clusterAlias, searchCoordinationExecutor);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ static void ccsRemoteReduce(
timeProvider.absoluteStartMillis(),
true
);
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias, remoteClientResponseExecutor);
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(clusterAlias, remoteClientResponseExecutor);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
Expand Down Expand Up @@ -604,11 +604,7 @@ public void onFailure(Exception e) {
task.getProgressListener(),
listener
);
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
remoteClientResponseExecutor
);
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(clusterAlias, remoteClientResponseExecutor);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, ccsListener);
}
if (localIndices != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ private <Request extends ActionRequest, Response extends ActionResponse> Transpo

@Override
public RemoteClusterClient getRemoteClusterClient(String clusterAlias, Executor responseExecutor) {
return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias, responseExecutor, true);
return remoteClusterService.getRemoteClusterClient(clusterAlias, responseExecutor, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterCredentialsManager.UpdateRemoteClusterCredentialsResult;

import java.io.Closeable;
Expand Down Expand Up @@ -544,18 +543,12 @@ public void onFailure(Exception e) {
/**
* Returns a client to the remote cluster if the given cluster alias exists.
*
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
* @param responseExecutor the executor to use to process the response
* @param ensureConnected whether requests should wait for a connection attempt when there isn't a connection available
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
*/
public RemoteClusterClient getRemoteClusterClient(
ThreadPool threadPool,
String clusterAlias,
Executor responseExecutor,
boolean ensureConnected
) {
public RemoteClusterClient getRemoteClusterClient(String clusterAlias, Executor responseExecutor, boolean ensureConnected) {
if (transportService.getRemoteClusterService().isEnabled() == false) {
throw new IllegalArgumentException(
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
Expand All @@ -570,14 +563,12 @@ public RemoteClusterClient getRemoteClusterClient(
/**
* Returns a client to the remote cluster if the given cluster alias exists.
*
* @param threadPool the {@link ThreadPool} for the client
* @param clusterAlias the cluster alias the remote cluster is registered under
* @param responseExecutor the executor to use to process the response
* @throws IllegalArgumentException if the given clusterAlias doesn't exist
*/
public RemoteClusterClient getRemoteClusterClient(ThreadPool threadPool, String clusterAlias, Executor responseExecutor) {
public RemoteClusterClient getRemoteClusterClient(String clusterAlias, Executor responseExecutor) {
return getRemoteClusterClient(
threadPool,
clusterAlias,
responseExecutor,
transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void testConnectAndExecuteRequest() throws Exception {
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
var client = remoteClusterService.getRemoteClusterClient(
threadPool,
"test",
threadPool.executor(TEST_THREAD_POOL_NAME),
randomBoolean()
Expand Down Expand Up @@ -172,7 +171,7 @@ public void testEnsureWeReconnect() throws Exception {
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE, true);
var client = remoteClusterService.getRemoteClusterClient("test", EsExecutors.DIRECT_EXECUTOR_SERVICE, true);
ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
);
Expand Down Expand Up @@ -200,7 +199,7 @@ public void testRemoteClusterServiceNotEnabled() {
final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE, randomBoolean())
() -> remoteClusterService.getRemoteClusterClient("test", EsExecutors.DIRECT_EXECUTOR_SERVICE, randomBoolean())
);
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
}
Expand Down Expand Up @@ -243,7 +242,7 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE);
var client = remoteClusterService.getRemoteClusterClient("test", EsExecutors.DIRECT_EXECUTOR_SERVICE);

try {
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,7 @@ void performRemoteClusterOperation(final String clusterAlias, final OriginalIndi
try {
TermsEnumRequest req = new TermsEnumRequest(request).indices(remoteIndices.indices());

var remoteClient = remoteClusterService.getRemoteClusterClient(
transportService.getThreadPool(),
clusterAlias,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
var remoteClient = remoteClusterService.getRemoteClusterClient(clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE);
remoteClient.execute(TermsEnumAction.INSTANCE, req, new ActionListener<>() {
@Override
public void onResponse(TermsEnumResponse termsEnumResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,7 @@ public void testIndicesPrivilegesAreEnforcedForCcrRestoreSessionActions() throws
assertThat(remoteConnectionInfos, hasSize(1));
assertThat(remoteConnectionInfos.get(0).isConnected(), is(true));

final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
"my_remote_cluster",
threadPool.generic()
);
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient("my_remote_cluster", threadPool.generic());

// Creating a restore session fails if index is not accessible
final ShardId privateShardId = new ShardId("private-index", privateIndexUUID, 0);
Expand Down Expand Up @@ -315,7 +311,6 @@ public void testRestApiKeyIsNotAllowedOnRemoteClusterPort() throws IOException {
try (MockTransportService service = startTransport("node", threadPool, (String) apiKeyMap.get("encoded"))) {
final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
"my_remote_cluster",
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Expand Down Expand Up @@ -389,7 +384,6 @@ public void testUpdateCrossClusterApiKey() throws Exception {
assertThat(remoteConnectionInfos, hasSize(1));
assertThat(remoteConnectionInfos.get(0).isConnected(), is(true));
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
"my_remote_cluster",
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Expand Down

0 comments on commit 2d073f4

Please sign in to comment.