Skip to content

Commit

Permalink
Exclude node from voting configuration when restarting it
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Ershov committed Dec 7, 2018
1 parent 830def1 commit 6e5bbd4
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,10 @@ public boolean clearData(String nodeName) {
*/
public void testIndexDeletionWhenNodeRejoins() throws Exception {
final String indexName = "test-index-del-on-node-rejoin-idx";
// We need at least 3 nodes to make sure, that once one node is stopped, remaining nodes can elect a new master
final int numNodes = 3;
final int numNodes = 2;

final List<String> nodes;
logger.info("--> starting a cluster with " + numNodes + " nodes");

nodes = internalCluster().startNodes(numNodes,
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100)).build());
logger.info("--> create an index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,35 +1636,7 @@ private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws
}

private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
final Set<String> excludedNodeIds = new HashSet<>();

if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {

final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();

assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
assert excludedNodeIds.size() == stoppingMasters;

logger.info("adding voting config exclusions {} prior to shutdown", excludedNodeIds);
try {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}

if (stoppingMasters > 0) {
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
}
}
final Set<String> excludedNodeIds = excludeMasters(nodeAndClients);

for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
Expand All @@ -1673,14 +1645,7 @@ private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndC
nodeAndClient.close();
}

if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after shutdown", excludedNodeIds);
try {
client().execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}
removeExclusions(excludedNodeIds);
}

/**
Expand Down Expand Up @@ -1746,31 +1711,78 @@ public synchronized void rollingRestart(RestartCallback callback) throws Excepti

private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
logger.info("Restarting node [{}] ", nodeAndClient.name);

if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
final int masterNodesCount = getMasterNodesCount();
// special case to allow stopping one node in a two node cluster and keep it functional
final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
if (updateMinMaster) {
updateMinMasterNodes(masterNodesCount - 1);
}

Set<String> excludedNodeIds = excludeMasters(Collections.singleton(nodeAndClient));

final Settings newSettings = nodeAndClient.closeForRestart(callback,
autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);

removeExclusions(excludedNodeIds);

nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
nodeAndClient.startNode();
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
if (callback.validateClusterForming() || updateMinMaster) {

if (callback.validateClusterForming() && excludedNodeIds.isEmpty() == false) {
// we have to validate cluster size if updateMinMaster == true, because we need the
// second node to join in order to increment min_master_nodes back to 2.
// we also have to do via the node that was just restarted as it may be that the master didn't yet process
// the fact it left
validateClusterFormed(nodeAndClient.name);
}
if (updateMinMaster) {
updateMinMasterNodes(masterNodesCount);

if (excludedNodeIds.isEmpty() == false) {
updateMinMasterNodes(getMasterNodesCount());
}
}

private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
final Set<String> excludedNodeIds = new HashSet<>();
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {

final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();

assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
assert excludedNodeIds.size() == stoppingMasters;

logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds);
try {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}

if (stoppingMasters > 0) {
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
}
}
return excludedNodeIds;
}

private void removeExclusions(Set<String> excludedNodeIds) {
if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
try {
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client(random);
client.execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}
}

Expand Down Expand Up @@ -1828,7 +1840,6 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
}
}


/**
* Returns the name of the current master node in the cluster.
*/
Expand Down

0 comments on commit 6e5bbd4

Please sign in to comment.