Skip to content

Commit

Permalink
Establish seed node connection setup in async
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Jun 13, 2023
1 parent e6348c5 commit 91fe701
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))

### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -63,7 +62,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -331,32 +329,32 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings,
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
* to all configured seed nodes.
*/
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Collection<Void>> future = new PlainActionFuture<>();
void initializeRemoteClusters(ActionListener<Void> listener) {
Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings);

if (enabledClusters.isEmpty()) {
listener.onResponse(null);
return;
}

GroupedActionListener<Void> listener = new GroupedActionListener<>(future, enabledClusters.size());
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, listener);
}
GroupedActionListener<Void> groupListener = new GroupedActionListener<>(
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
enabledClusters.size()
);

if (enabledClusters.isEmpty()) {
future.onResponse(null);
}
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.debug("Successfully established connection with {}", clusterAlias);
groupListener.onResponse(unused);
}

try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
} catch (Exception e) {
throw new IllegalStateException("failed to connect to remote clusters", e);
@Override
public void onFailure(Exception e) {
logger.error("Failed to established connection with {}: {}", clusterAlias, e);
groupListener.onFailure(e);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ protected void doStart() {

if (remoteClusterClient) {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
remoteClusterService.initializeRemoteClusters(
ActionListener.wrap(
r -> logger.info("Remote clusters initialized successfully."),
e -> logger.error("Remote clusters initialization failed partially", e)
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -92,6 +93,20 @@ private MockTransportService startTransport(
return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings);
}

void initializeRemoteClusters(RemoteClusterService service) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.initializeRemoteClusters(future);
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("Timed out connecting to remote clusters");
} catch (Exception e) {
throw new IllegalStateException("failed to connect to remote clusters", e);
}
}

public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
Expand Down Expand Up @@ -157,7 +172,7 @@ public void testGroupClusterIndices() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
Expand Down Expand Up @@ -228,7 +243,7 @@ public void testGroupIndices() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
Expand Down Expand Up @@ -321,7 +336,7 @@ public void testIncrementallyAddClusters() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());
Settings cluster1Settings = createSettings(
"cluster_1",
Expand Down Expand Up @@ -384,7 +399,7 @@ public void testDefaultPingSchedule() throws IOException {
transportService.acceptIncomingRequests();
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
service.validateAndUpdateRemoteCluster(
"cluster_1",
Expand Down Expand Up @@ -436,7 +451,7 @@ public void testCustomPingSchedule() throws IOException {
TimeValue.timeValueSeconds(randomIntBetween(1, 10));
builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");
assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval());
Expand Down Expand Up @@ -467,7 +482,7 @@ public void testChangeSettings() throws Exception {
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
initializeRemoteClusters(service);
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
Settings.Builder settingsChange = Settings.builder();
TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8));
Expand Down Expand Up @@ -517,7 +532,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -580,7 +595,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -648,7 +663,7 @@ public void testCollectNodes() throws InterruptedException, IOException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -896,7 +911,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());

final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
Expand Down

0 comments on commit 91fe701

Please sign in to comment.