Skip to content

Commit

Permalink
Fail sniff process if no connections opened (#54934)
Browse files Browse the repository at this point in the history
Currently the remote cluster sniff connection process can succeed even
if no connections are opened. This commit fixes this by failing the
connection process if no connections are successfully opened.
  • Loading branch information
Tim-Brooks committed Apr 10, 2020
1 parent 96a903b commit 98fba92
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,6 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
}
}

List<String> getSeedNodes() {
return configuredSeedNodes;
}

int getMaxConnections() {
return maxNumRemoteConnections;
}

/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {

Expand Down Expand Up @@ -438,7 +430,12 @@ public void onFailure(Exception e) {
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
// from a code correctness perspective we could also close it afterwards.
IOUtils.closeWhileHandlingException(connection);
listener.onResponse(null);
int openConnections = connectionManager.size();
if (openConnections == 0) {
listener.onFailure(new IllegalStateException("Unable to open any connections to remote cluster [" + clusterAlias + "]"));
} else {
listener.onResponse(null);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,34 @@ public void testFilterNodesWithNodePredicate() {
}
}

public void testConnectFailsIfNoConnectionsOpened() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService closedTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalNode();
DiscoveryNode discoverableNode = closedTransport.getLocalNode();
knownNodes.add(discoverableNode);
closedTransport.close();

try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();

// Predicate excludes seed node as a possible connection
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> n.equals(seedNode) == false, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
final IllegalStateException ise = expectThrows(IllegalStateException.class, connectFuture::actionGet);
assertEquals("Unable to open any connections to remote cluster [cluster-alias]", ise.getMessage());
assertTrue(strategy.assertNoRunningConnections());
}
}
}
}

public void testClusterNameValidationPreventConnectingToDifferentClusters() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
List<DiscoveryNode> otherKnownNodes = new CopyOnWriteArrayList<>();
Expand Down

0 comments on commit 98fba92

Please sign in to comment.