diff --git a/CHANGELOG.md b/CHANGELOG.md index bfadf4f34d6ba..60f16de7c3623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514)) - Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956)) - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) +- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814) diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java index 838367baf8a2e..60e166a4e300c 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java @@ -38,7 +38,6 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -63,7 +62,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -328,35 +326,23 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, } /** - * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection + * Connects to all remote clusters in a non-blocking fashion. This should be called on node startup to establish an initial connection * to all configured seed nodes. */ - void initializeRemoteClusters() { - final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture> future = new PlainActionFuture<>(); + void initializeRemoteClusters(ActionListener listener) { Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); - if (enabledClusters.isEmpty()) { + listener.onResponse(null); return; } - GroupedActionListener listener = new GroupedActionListener<>(future, enabledClusters.size()); - for (String clusterAlias : enabledClusters) { - updateRemoteCluster(clusterAlias, settings, listener); - } + GroupedActionListener groupListener = new GroupedActionListener<>( + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + enabledClusters.size() + ); - if (enabledClusters.isEmpty()) { - future.onResponse(null); - } - - try { - future.get(timeValue.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); - } catch (Exception e) { - throw new IllegalStateException("failed to connect to remote clusters", e); + for (String clusterAlias : enabledClusters) { + updateRemoteCluster(clusterAlias, settings, groupListener); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 526728aea725c..13491609945a0 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -291,7 +291,12 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); + remoteClusterService.initializeRemoteClusters( + ActionListener.wrap( + r -> logger.info("Remote clusters initialized successfully."), + e -> logger.error("Remote clusters initialization failed partially", e) + ) + ); } } diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java index 94c535a5e20f5..b89d652510850 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java @@ -86,7 +86,7 @@ public void testConnectAndExecuteRequest() throws Exception { service.acceptIncomingRequests(); logger.info("now accepting incoming requests on local transport"); RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertBusy(() -> { assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); }, 10, TimeUnit.SECONDS); Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java index dea42684d2e21..8b26fa42e7b76 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java @@ -59,6 +59,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -92,6 +93,20 @@ private MockTransportService startTransport( return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); } + void initializeRemoteClusters(RemoteClusterService service) { + final PlainActionFuture future = new PlainActionFuture<>(); + service.initializeRemoteClusters(future); + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TimeoutException ex) { + logger.warn("Timed out connecting to remote clusters"); + } catch (Exception e) { + throw new IllegalStateException("failed to connect to remote clusters", e); + } + } + public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); @@ -157,7 +172,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -228,7 +243,7 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -321,7 +336,7 @@ public void testIncrementallyAddClusters() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); Settings cluster1Settings = createSettings( "cluster_1", @@ -384,7 +399,7 @@ public void testDefaultPingSchedule() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); service.validateAndUpdateRemoteCluster( "cluster_1", @@ -436,7 +451,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -467,7 +482,7 @@ public void testChangeSettings() throws Exception { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); @@ -517,7 +532,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -580,7 +595,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -648,7 +663,7 @@ public void testCollectNodes() throws InterruptedException, IOException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -896,7 +911,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");