From 6e289674e17c4ee9ec9889c7672af4f1e8e91a33 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 14 May 2018 20:12:52 +0200 Subject: [PATCH] Auto-expand replicas only after failing nodes (#30553) #30423 combined auto-expansion in the same cluster state update where nodes are removed. As the auto-expansion step would run before deassociating the dead nodes from the routing table, the auto-expansion would possibly remove replicas from live nodes instead of dead ones. This commit reverses the order to ensure that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node. Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto- expansion (removing replicas) only triggers in a follow-up step (but still same cluster state update). Relates to #30456 and follow-up of #30423 --- .../routing/allocation/AllocationService.java | 42 +++--- .../discovery/zen/NodeJoinController.java | 4 +- .../metadata/AutoExpandReplicasTests.java | 128 ++++++++++++++++++ .../indices/cluster/ClusterStateChanges.java | 10 ++ 4 files changed, 164 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index deb10b83b5a5d..569ddd6cee772 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -114,11 +114,24 @@ public ClusterState applyStartedShards(ClusterState clusterState, List roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, + Version.CURRENT); + } + + /** + * Checks that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on + * the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node. + * Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto-expansion (removing replicas) only + * triggers in a follow-up step. + */ + public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedException { + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master + allNodes.add(localNode); + int numDataNodes = randomIntBetween(3, 5); + List dataNodes = new ArrayList<>(numDataNodes); + for (int i = 0; i < numDataNodes; i++) { + dataNodes.add(createNode(DiscoveryNode.Role.DATA)); + } + allNodes.addAll(dataNodes); + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); + + CreateIndexRequest request = new CreateIndexRequest("index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build()) + .waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex("index")); + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + IndexShardRoutingTable preTable = state.routingTable().index("index").shard(0); + final Set unchangedNodeIds; + final IndexShardRoutingTable postTable; + + if (randomBoolean()) { + // simulate node removal + List nodesToRemove = randomSubsetOf(2, dataNodes); + unchangedNodeIds = dataNodes.stream().filter(n -> nodesToRemove.contains(n) == false) + .map(DiscoveryNode::getId).collect(Collectors.toSet()); + + state = cluster.removeNodes(state, nodesToRemove); + postTable = state.routingTable().index("index").shard(0); + + assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted()); + assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(isIn(preTable.getAllAllocationIds()))); + } else { + // fake an election where conflicting nodes are removed and readded + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build(); + + List conflictingNodes = randomSubsetOf(2, dataNodes); + unchangedNodeIds = dataNodes.stream().filter(n -> conflictingNodes.contains(n) == false) + .map(DiscoveryNode::getId).collect(Collectors.toSet()); + + List nodesToAdd = conflictingNodes.stream() + .map(n -> new DiscoveryNode(n.getName(), n.getId(), buildNewFakeTransportAddress(), n.getAttributes(), n.getRoles(), n.getVersion())) + .collect(Collectors.toList()); + + if (randomBoolean()) { + nodesToAdd.add(createNode(DiscoveryNode.Role.DATA)); + } + + state = cluster.joinNodesAndBecomeMaster(state, nodesToAdd); + postTable = state.routingTable().index("index").shard(0); + } + + Set unchangedAllocationIds = preTable.getShards().stream().filter(shr -> unchangedNodeIds.contains(shr.currentNodeId())) + .map(shr -> shr.allocationId().getId()).collect(Collectors.toSet()); + + assertThat(postTable.toString(), unchangedAllocationIds, everyItem(isIn(postTable.getAllAllocationIds()))); + + postTable.getShards().forEach( + shardRouting -> { + if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) { + assertTrue("Shard should be active: " + shardRouting, shardRouting.active()); + } + } + ); + } finally { + terminate(threadPool); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 1a5e0241020d6..4b4ad67f356a8 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -87,6 +87,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -232,6 +233,15 @@ public ClusterState addNodes(ClusterState clusterState, List node return runTasks(joinTaskExecutor, clusterState, nodes); } + public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List nodes) { + List joinNodes = new ArrayList<>(); + joinNodes.add(NodeJoinController.BECOME_MASTER_TASK); + joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK); + joinNodes.addAll(nodes); + + return runTasks(joinTaskExecutor, clusterState, joinNodes); + } + public ClusterState removeNodes(ClusterState clusterState, List nodes) { return runTasks(nodeRemovalExecutor, clusterState, nodes.stream() .map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList()));