Skip to content

Commit

Permalink
Fix size of rolling-upgrade bootstrap config
Browse files Browse the repository at this point in the history
Today a Zen2 node will bootstrap itself once it believes there to be no
remaining Zen1 master-eligible nodes in the cluster as long as
minimum_master_nodes is satisfied.

However the bootstrap configuration comprises just the ids of the known
master-eligible nodes, and this might be too small to be safe. For instance, if
there are 5 master-eligible nodes (so that minimum_master_nodes is 3) then the
bootstrap configuration could comprise just 3 nodes, of which 2 form a quorum,
and this does not intersect other quorums that might arise, leading to a
split-brain.

This commit fixes this by expanding the bootstrap configuration so that its
quorums satisfy minimum_master_nodes, using known node IDs if possible and
placeholders if not.
  • Loading branch information
DaveCTurner committed Jan 30, 2019
1 parent 4dee3f7 commit d93012e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private String describeQuorum(VotingConfiguration votingConfiguration) {

final Set<String> realNodeIds = new HashSet<>(nodeIds);
realNodeIds.removeIf(ClusterBootstrapService::isBootstrapPlaceholder);
realNodeIds.removeIf(DiscoveryUpgradeService::isDiscoveryUpgradePlaceholder);
assert requiredNodes <= realNodeIds.size() : nodeIds;

if (nodeIds.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -72,6 +75,8 @@ public class DiscoveryUpgradeService {

private static Logger logger = LogManager.getLogger(DiscoveryUpgradeService.class);

public static final String DISCOVERY_UPGRADE_PLACEHOLDER_PREFIX = "{discovery-upgrade-placeholder}";

// how long to wait after activation before attempting to join a master or perform a bootstrap upgrade
public static final Setting<TimeValue> BWC_PING_TIMEOUT_SETTING =
Setting.timeSetting("discovery.zen.bwc_ping_timeout",
Expand Down Expand Up @@ -130,7 +135,11 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastA
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();

assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
final Set<String> knownMasterNodeIds = new HashSet<>();
lastAcceptedClusterState.nodes().getMasterNodes().forEach(c -> knownMasterNodeIds.add(c.key));

joiningRound
= new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes, knownMasterNodeIds);
joiningRound.scheduleNextAttempt();
}

Expand Down Expand Up @@ -168,10 +177,12 @@ void countDown() {
private class JoiningRound {
private final boolean upgrading;
private final int minimumMasterNodes;
private final Set<String> knownMasterNodeIds;

JoiningRound(boolean upgrading, int minimumMasterNodes) {
JoiningRound(boolean upgrading, int minimumMasterNodes, Set<String> knownMasterNodeIds) {
this.upgrading = upgrading;
this.minimumMasterNodes = minimumMasterNodes;
this.knownMasterNodeIds = knownMasterNodeIds;
}

private boolean isRunning() {
Expand Down Expand Up @@ -210,8 +221,25 @@ public void run() {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
Set<String> nodeIds = new HashSet<>();
discoveryNodes.forEach(n -> nodeIds.add(n.getId()));

final Iterator<String> knownNodeIdIterator = knownMasterNodeIds.iterator();
while (nodeIds.size() < 2 * minimumMasterNodes - 1 && knownNodeIdIterator.hasNext()) {
nodeIds.add(knownNodeIdIterator.next());
}

while (nodeIds.size() < 2 * minimumMasterNodes - 2) {
final boolean added = nodeIds.add(DISCOVERY_UPGRADE_PLACEHOLDER_PREFIX + nodeIds.size());
assert added;
}

final VotingConfiguration votingConfiguration = new VotingConfiguration(nodeIds);
assert votingConfiguration.hasQuorum(
discoveryNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()));
assert 2 * minimumMasterNodes - 2 <= nodeIds.size() : nodeIds + " too small for " + minimumMasterNodes;

initialConfigurationConsumer.accept(votingConfiguration);
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
Expand Down Expand Up @@ -324,4 +352,8 @@ public static DiscoveryNode createDiscoveryNodeWithImpossiblyHighId(DiscoveryNod
return new DiscoveryNode(node.getName(), "{zen2}" + node.getId(), node.getEphemeralId(), node.getHostName(),
node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion());
}

public static boolean isDiscoveryUpgradePlaceholder(String nodeId) {
return nodeId.startsWith(DISCOVERY_UPGRADE_PLACEHOLDER_PREFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.DiscoveryUpgradeService.DISCOVERY_UPGRADE_PLACEHOLDER_PREFIX;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -274,8 +275,8 @@ public void testDescriptionAfterBootstrapping() {
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));

assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3",
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", BOOTSTRAP_PLACEHOLDER_PREFIX + "n4",
DISCOVERY_UPGRADE_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L).getDescription(),
is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -34,22 +36,32 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.JoinHelper.START_JOIN_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

Expand All @@ -65,6 +77,10 @@ public class Zen1IT extends ESIntegTestCase {
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

public void testZen2NodesJoiningZen1Cluster() {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS);
Expand All @@ -77,6 +93,56 @@ public void testZen1NodesJoiningZen2Cluster() {
createIndex("test");
}

public void testMixedClusterDisruption() throws Exception {
final List<String> nodes = internalCluster().startNodes(IntStream.range(0, 5)
.mapToObj(i -> i < 2 ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new));

final List<MockTransportService> transportServices = nodes.stream()
.map(n -> (MockTransportService) internalCluster().getInstance(TransportService.class, n)).collect(Collectors.toList());

logger.info("--> disrupting communications");

// The idea here is to make some of the Zen2 nodes believe the Zen1 nodes have gone away by introducing a network partition, so that
// they bootstrap themselves, but keep the Zen1 side of the cluster alive.

// Set up a bridged network partition with the Zen1 nodes {0,1} on one side, Zen2 nodes {3,4} on the other, and node {2} in both
transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(3));
transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(4));
transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(3));
transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(4));
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(0));
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(1));
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(0));
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(1));

// Nodes 3 and 4 will bootstrap, but we want to keep node 2 as part of the Zen1 cluster, so prevent any messages that might switch
// its allegiance
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(2),
PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME);
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(2),
PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME);

logger.info("--> waiting for disconnected nodes to be removed");
ensureStableCluster(3, nodes.get(0));

logger.info("--> creating index on Zen1 side");
assertAcked(client(nodes.get(0)).admin().indices().create(new CreateIndexRequest("test")).get());
assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test")
.waitForGreenStatus()).get().isTimedOut());

logger.info("--> waiting for disconnected nodes to bootstrap themselves");
assertBusy(() -> assertTrue(IntStream.range(3, 5)
.mapToObj(n -> (Coordinator) internalCluster().getInstance(Discovery.class, nodes.get(n)))
.anyMatch(Coordinator::isInitialConfigurationSet)));

logger.info("--> clearing disruption and waiting for cluster to reform");
transportServices.forEach(MockTransportService::clearAllRules);

ensureStableCluster(5, nodes.get(0));
assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test")
.waitForGreenStatus()).get().isTimedOut());
}

public void testMixedClusterFormation() throws Exception {
final int zen1NodeCount = randomIntBetween(1, 3);
final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3);
Expand Down

0 comments on commit d93012e

Please sign in to comment.