Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix size of rolling-upgrade bootstrap config #38031

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -130,7 +132,11 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastA
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();

assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
final Set<String> knownMasterNodeIds = new HashSet<>();
lastAcceptedClusterState.nodes().getMasterNodes().forEach(c -> knownMasterNodeIds.add(c.key));

joiningRound
= new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes, knownMasterNodeIds);
joiningRound.scheduleNextAttempt();
}

Expand Down Expand Up @@ -168,10 +174,12 @@ void countDown() {
private class JoiningRound {
private final boolean upgrading;
private final int minimumMasterNodes;
private final Set<String> knownMasterNodeIds;

JoiningRound(boolean upgrading, int minimumMasterNodes) {
JoiningRound(boolean upgrading, int minimumMasterNodes, Set<String> knownMasterNodeIds) {
this.upgrading = upgrading;
this.minimumMasterNodes = minimumMasterNodes;
this.knownMasterNodeIds = knownMasterNodeIds;
}

private boolean isRunning() {
Expand Down Expand Up @@ -210,8 +218,20 @@ public void run() {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
Set<String> nodeIds = new HashSet<>();
discoveryNodes.forEach(n -> nodeIds.add(n.getId()));

final Iterator<String> knownNodeIdIterator = knownMasterNodeIds.iterator();
while (nodeIds.size() < 2 * minimumMasterNodes - 1 && knownNodeIdIterator.hasNext()) {
nodeIds.add(knownNodeIdIterator.next());
}

final VotingConfiguration votingConfiguration = new VotingConfiguration(nodeIds);
assert votingConfiguration.hasQuorum(
discoveryNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()));
assert 2 * minimumMasterNodes - 2 <= nodeIds.size() : nodeIds + " too small for " + minimumMasterNodes;

initialConfigurationConsumer.accept(votingConfiguration);
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -34,24 +36,34 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.JoinHelper.START_JOIN_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.test.InternalTestCluster.REMOVED_MINIMUM_MASTER_NODES;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

Expand All @@ -67,6 +79,10 @@ public class Zen1IT extends ESIntegTestCase {
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

public void testZen2NodesJoiningZen1Cluster() {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS);
Expand All @@ -79,6 +95,56 @@ public void testZen1NodesJoiningZen2Cluster() {
createIndex("test");
}

public void testMixedClusterDisruption() throws Exception {
final List<String> nodes = internalCluster().startNodes(IntStream.range(0, 5)
.mapToObj(i -> i < 2 ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new));

final List<MockTransportService> transportServices = nodes.stream()
.map(n -> (MockTransportService) internalCluster().getInstance(TransportService.class, n)).collect(Collectors.toList());

logger.info("--> disrupting communications");

// The idea here is to make some of the Zen2 nodes believe the Zen1 nodes have gone away by introducing a network partition, so that
// they bootstrap themselves, but keep the Zen1 side of the cluster alive.

// Set up a bridged network partition with the Zen1 nodes {0,1} on one side, Zen2 nodes {3,4} on the other, and node {2} in both
transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(3));
transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(4));
transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(3));
transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(4));
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(0));
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(1));
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(0));
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(1));

// Nodes 3 and 4 will bootstrap, but we want to keep node 2 as part of the Zen1 cluster, so prevent any messages that might switch
// its allegiance
transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(2),
PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME);
transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(2),
PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME);

logger.info("--> waiting for disconnected nodes to be removed");
ensureStableCluster(3, nodes.get(0));

logger.info("--> creating index on Zen1 side");
assertAcked(client(nodes.get(0)).admin().indices().create(new CreateIndexRequest("test")).get());
assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test")
.waitForGreenStatus()).get().isTimedOut());

logger.info("--> waiting for disconnected nodes to bootstrap themselves");
assertBusy(() -> assertTrue(IntStream.range(3, 5)
.mapToObj(n -> (Coordinator) internalCluster().getInstance(Discovery.class, nodes.get(n)))
.anyMatch(Coordinator::isInitialConfigurationSet)));

logger.info("--> clearing disruption and waiting for cluster to reform");
transportServices.forEach(MockTransportService::clearAllRules);

ensureStableCluster(5, nodes.get(0));
assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test")
.waitForGreenStatus()).get().isTimedOut());
}

public void testMixedClusterFormation() throws Exception {
final int zen1NodeCount = randomIntBetween(1, 3);
final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3);
Expand Down