Skip to content

Commit

Permalink
Address comments + minor changes
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 6, 2024
1 parent 09e131d commit b0a7ae3
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,87 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
assertThat(response.isTimedOut(), is(false));
}

public void testRestartDataNode() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s")
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.build();
// start a 3 node cluster
internalCluster().startNode(nodeSettings);
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());

ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));

client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get();

Settings redNodeDataPathSettings = internalCluster().dataPathSettings(redNodeName);
logger.info("-> stopping data node");
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(redNodeName));
response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
assertThat(response.isTimedOut(), is(false));

logger.info("-> restarting stopped node");
internalCluster().startNode(Settings.builder().put("node.name", redNodeName).put(redNodeDataPathSettings).build());
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));
}

public void testRestartCmNode() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s")
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.build();
// start a 3 node cluster
final String cm = internalCluster().startNode(Settings.builder().put("node.attr.color", "yellow").put(nodeSettings).build());
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());

ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));

client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get();

Settings cmNodeSettings = internalCluster().dataPathSettings(cm);

logger.info("-> stopping cluster-manager node");
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(cm));
response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
assertThat(response.isTimedOut(), is(false));

logger.info("-> restarting stopped node");
internalCluster().startNode(Settings.builder().put("node.name", cm).put(cmNodeSettings).build());
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));
}

private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
private final Runnable connectionBreaker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,16 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
}

// There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore
// This code block clears the pending disconnect for these nodes to avoid permanently blocking node joins
// This situation should ideally not happen
// So these nodes would not be there in targetsByNode and would not have disconnect() called for them
// This code block clears the pending disconnect for these nodes that don't have entries in targetsByNode
// to avoid permanently blocking node joins
// This situation should ideally not happen, this is just for extra safety
transportService.removePendingDisconnections(
transportService.getPendingDisconnections()
targetsByNode.keySet()
.stream()
.filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode))
.collect(Collectors.toSet())
);

}
runnables.forEach(Runnable::run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
// marking pending disconnects before publish
// if a nodes tries to send a joinRequest while it is pending disconnect, it should fail
transportService.setPendingDisconnections(clusterChangedEvent.nodesDelta());
transportService.setPendingDisconnections(new HashSet<>(clusterChangedEvent.nodesDelta().removedNodes()));
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ClusterConnectionManager implements ConnectionManager {
Nodes are marked as pending disconnect right before cluster state publish phase.
They are cleared up as part of cluster state apply commit phase
This is to avoid connections from being made to nodes that are in the process of leaving the cluster
Note: If a disconnect is initiated while a connect is in progress, this Set will not handle this case.
Callers need to ensure that connects and disconnects are sequenced.
*/
private final Set<DiscoveryNode> pendingDisconnections = ConcurrentCollections.newConcurrentSet();
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
Expand Down Expand Up @@ -129,7 +131,7 @@ public void connectToNode(
ConnectionValidator connectionValidator,
ActionListener<Void> listener
) throws ConnectTransportException {
logger.trace("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
logger.trace("connecting to node [{}]", node);
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
Expand All @@ -138,11 +140,7 @@ public void connectToNode(

// if node-left is still in progress, we fail the connect request early
if (pendingDisconnections.contains(node)) {
listener.onFailure(
new IllegalStateException(
"blocked connection to node [" + node + "] because node-left is currently in progress for this node"
)
);
listener.onFailure(new IllegalStateException("cannot make a new connection as disconnect to node [" + node + "] is pending"));
return;
}

Expand Down Expand Up @@ -188,6 +186,7 @@ public void connectToNode(
conn.addCloseListener(ActionListener.wrap(() -> {
logger.trace("unregistering {} after connection close and marking as disconnected", node);
connectedNodes.remove(node, finalConnection);
pendingDisconnections.remove(node);
connectionListener.onNodeDisconnected(node, conn);
}));
}
Expand Down Expand Up @@ -249,20 +248,15 @@ public void disconnectFromNode(DiscoveryNode node) {
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return pendingDisconnections;
public void setPendingDisconnection(DiscoveryNode node) {
logger.debug("marking disconnection as pending for node: [{}]", node);
pendingDisconnections.add(node);
}

@Override
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
logger.debug("set pending disconnection for nodes: [{}]", nodes);
pendingDisconnections.addAll(nodes);
}

@Override
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
logger.debug("marking disconnection as completed for nodes: [{}]", nodes);
pendingDisconnections.removeAll(nodes);
public void removePendingDisconnection(DiscoveryNode node) {
logger.debug("marking disconnection as completed for node: [{}]", node);
pendingDisconnections.remove(node);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ void connectToNode(

void disconnectFromNode(DiscoveryNode node);

Set<DiscoveryNode> getPendingDisconnections();
void setPendingDisconnection(DiscoveryNode node);

void setPendingDisconnections(Set<DiscoveryNode> nodes);

void removePendingDisconnections(Set<DiscoveryNode> nodes);
void removePendingDisconnection(DiscoveryNode node);

Set<DiscoveryNode> getAllConnectedNodes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,13 @@ public void disconnectFromNode(DiscoveryNode node) {
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return delegate.getPendingDisconnections();
public void setPendingDisconnection(DiscoveryNode node) {
delegate.setPendingDisconnection(node);
}

@Override
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
delegate.setPendingDisconnections(nodes);
}

@Override
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
delegate.removePendingDisconnections(nodes);
public void removePendingDisconnection(DiscoveryNode node) {
delegate.removePendingDisconnection(node);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.Streamables;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -774,16 +773,12 @@ public void disconnectFromNode(DiscoveryNode node) {
connectionManager.disconnectFromNode(node);
}

public Set<DiscoveryNode> getPendingDisconnections() {
return connectionManager.getPendingDisconnections();
}

public void setPendingDisconnections(DiscoveryNodes.Delta nodesDelta) {
connectionManager.setPendingDisconnections(new HashSet<>(nodesDelta.removedNodes()));
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
nodes.forEach(connectionManager::setPendingDisconnection);
}

public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
connectionManager.removePendingDisconnections(nodes);
nodes.forEach(connectionManager::removePendingDisconnection);
}

public void addMessageListener(TransportMessageListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,13 @@ public void disconnectFromNode(DiscoveryNode node) {
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return delegate.getPendingDisconnections();
public void setPendingDisconnection(DiscoveryNode node) {
delegate.setPendingDisconnection(node);
}

@Override
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
delegate.setPendingDisconnections(nodes);
}

@Override
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
delegate.removePendingDisconnections(nodes);
public void removePendingDisconnection(DiscoveryNode node) {
delegate.removePendingDisconnection(node);
}

@Override
Expand Down

0 comments on commit b0a7ae3

Please sign in to comment.