diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java index 80ac5596cc8d4..51d71f95f33c8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java @@ -172,6 +172,87 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception { assertThat(response.isTimedOut(), is(false)); } + public void testRestartDataNode() throws Exception { + final String indexName = "test"; + final Settings nodeSettings = Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms") + .put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s") + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .build(); + // start a 3 node cluster + internalCluster().startNode(nodeSettings); + internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); + assertThat(response.isTimedOut(), is(false)); + + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get(); + + Settings redNodeDataPathSettings = internalCluster().dataPathSettings(redNodeName); + logger.info("-> stopping data node"); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(redNodeName)); + response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response.isTimedOut(), is(false)); + + logger.info("-> restarting stopped node"); + internalCluster().startNode(Settings.builder().put("node.name", redNodeName).put(redNodeDataPathSettings).build()); + response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + } + + public void testRestartCmNode() throws Exception { + final String indexName = "test"; + final Settings nodeSettings = Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms") + .put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s") + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .build(); + // start a 3 node cluster + final String cm = internalCluster().startNode(Settings.builder().put("node.attr.color", "yellow").put(nodeSettings).build()); + internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + + ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); + assertThat(response.isTimedOut(), is(false)); + + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get(); + + Settings cmNodeSettings = internalCluster().dataPathSettings(cm); + + logger.info("-> stopping cluster-manager node"); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(cm)); + response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + assertThat(response.isTimedOut(), is(false)); + + logger.info("-> restarting stopped node"); + internalCluster().startNode(Settings.builder().put("node.name", cm).put(cmNodeSettings).build()); + response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + assertThat(response.isTimedOut(), is(false)); + } + private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior { private final Runnable connectionBreaker; diff --git a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java index a19b3e24ba9c9..de6b154fd097a 100644 --- a/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java @@ -177,15 +177,16 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) { } // There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore - // This code block clears the pending disconnect for these nodes to avoid permanently blocking node joins - // This situation should ideally not happen + // So these nodes would not be there in targetsByNode and would not have disconnect() called for them + // This code block clears the pending disconnect for these nodes that don't have entries in targetsByNode + // to avoid permanently blocking node joins + // This situation should ideally not happen, this is just for extra safety transportService.removePendingDisconnections( - transportService.getPendingDisconnections() + targetsByNode.keySet() .stream() .filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode)) .collect(Collectors.toSet()) ); - } runnables.forEach(Runnable::run); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index b197bfbe2969f..c8c73465e51d2 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -1360,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes(); // marking pending disconnects before publish // if a nodes tries to send a joinRequest while it is pending disconnect, it should fail - transportService.setPendingDisconnections(clusterChangedEvent.nodesDelta()); + transportService.setPendingDisconnections(new HashSet<>(clusterChangedEvent.nodesDelta().removedNodes())); leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); diff --git a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java index c022aa6c2a2dd..58be31135c46e 100644 --- a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java @@ -69,6 +69,8 @@ public class ClusterConnectionManager implements ConnectionManager { Nodes are marked as pending disconnect right before cluster state publish phase. They are cleared up as part of cluster state apply commit phase This is to avoid connections from being made to nodes that are in the process of leaving the cluster + Note: If a disconnect is initiated while a connect is in progress, this Set will not handle this case. + Callers need to ensure that connects and disconnects are sequenced. */ private final Set pendingDisconnections = ConcurrentCollections.newConcurrentSet(); private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") { @@ -129,7 +131,7 @@ public void connectToNode( ConnectionValidator connectionValidator, ActionListener listener ) throws ConnectTransportException { - logger.trace("[{}]connecting to node [{}]", Thread.currentThread().getName(), node); + logger.trace("connecting to node [{}]", node); ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); if (node == null) { listener.onFailure(new ConnectTransportException(null, "can't connect to a null node")); @@ -138,11 +140,7 @@ public void connectToNode( // if node-left is still in progress, we fail the connect request early if (pendingDisconnections.contains(node)) { - listener.onFailure( - new IllegalStateException( - "blocked connection to node [" + node + "] because node-left is currently in progress for this node" - ) - ); + listener.onFailure(new IllegalStateException("cannot make a new connection as disconnect to node [" + node + "] is pending")); return; } @@ -188,6 +186,7 @@ public void connectToNode( conn.addCloseListener(ActionListener.wrap(() -> { logger.trace("unregistering {} after connection close and marking as disconnected", node); connectedNodes.remove(node, finalConnection); + pendingDisconnections.remove(node); connectionListener.onNodeDisconnected(node, conn); })); } @@ -249,20 +248,15 @@ public void disconnectFromNode(DiscoveryNode node) { } @Override - public Set getPendingDisconnections() { - return pendingDisconnections; + public void setPendingDisconnection(DiscoveryNode node) { + logger.debug("marking disconnection as pending for node: [{}]", node); + pendingDisconnections.add(node); } @Override - public void setPendingDisconnections(Set nodes) { - logger.debug("set pending disconnection for nodes: [{}]", nodes); - pendingDisconnections.addAll(nodes); - } - - @Override - public void removePendingDisconnections(Set nodes) { - logger.debug("marking disconnection as completed for nodes: [{}]", nodes); - pendingDisconnections.removeAll(nodes); + public void removePendingDisconnection(DiscoveryNode node) { + logger.debug("marking disconnection as completed for node: [{}]", node); + pendingDisconnections.remove(node); } /** diff --git a/server/src/main/java/org/opensearch/transport/ConnectionManager.java b/server/src/main/java/org/opensearch/transport/ConnectionManager.java index 88d89d952a2f6..1a5726887f246 100644 --- a/server/src/main/java/org/opensearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ConnectionManager.java @@ -65,11 +65,9 @@ void connectToNode( void disconnectFromNode(DiscoveryNode node); - Set getPendingDisconnections(); + void setPendingDisconnection(DiscoveryNode node); - void setPendingDisconnections(Set nodes); - - void removePendingDisconnections(Set nodes); + void removePendingDisconnection(DiscoveryNode node); Set getAllConnectedNodes(); diff --git a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java index aabc2a6b6a796..9eab2c4599e3b 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java @@ -115,18 +115,13 @@ public void disconnectFromNode(DiscoveryNode node) { } @Override - public Set getPendingDisconnections() { - return delegate.getPendingDisconnections(); + public void setPendingDisconnection(DiscoveryNode node) { + delegate.setPendingDisconnection(node); } @Override - public void setPendingDisconnections(Set nodes) { - delegate.setPendingDisconnections(nodes); - } - - @Override - public void removePendingDisconnections(Set nodes) { - delegate.removePendingDisconnections(nodes); + public void removePendingDisconnection(DiscoveryNode node) { + delegate.removePendingDisconnection(node); } @Override diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 50d8fb7ab6dda..e65c7a081ab14 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -41,7 +41,6 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.Streamables; import org.opensearch.common.lease.Releasable; @@ -774,16 +773,12 @@ public void disconnectFromNode(DiscoveryNode node) { connectionManager.disconnectFromNode(node); } - public Set getPendingDisconnections() { - return connectionManager.getPendingDisconnections(); - } - - public void setPendingDisconnections(DiscoveryNodes.Delta nodesDelta) { - connectionManager.setPendingDisconnections(new HashSet<>(nodesDelta.removedNodes())); + public void setPendingDisconnections(Set nodes) { + nodes.forEach(connectionManager::setPendingDisconnection); } public void removePendingDisconnections(Set nodes) { - connectionManager.removePendingDisconnections(nodes); + nodes.forEach(connectionManager::removePendingDisconnection); } public void addMessageListener(TransportMessageListener listener) { diff --git a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java index 78b894c6431df..f68648b31f3b4 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java @@ -124,18 +124,13 @@ public void disconnectFromNode(DiscoveryNode node) { } @Override - public Set getPendingDisconnections() { - return delegate.getPendingDisconnections(); + public void setPendingDisconnection(DiscoveryNode node) { + delegate.setPendingDisconnection(node); } @Override - public void setPendingDisconnections(Set nodes) { - delegate.setPendingDisconnections(nodes); - } - - @Override - public void removePendingDisconnections(Set nodes) { - delegate.removePendingDisconnections(nodes); + public void removePendingDisconnection(DiscoveryNode node) { + delegate.removePendingDisconnection(node); } @Override