Skip to content

Commit

Permalink
Auto-expand replicas only after failing nodes (#30553)
Browse files Browse the repository at this point in the history
#30423 combined auto-expansion in the same cluster state update where nodes are removed. As
the auto-expansion step would run before deassociating the dead nodes from the routing table, the
auto-expansion would possibly remove replicas from live nodes instead of dead ones. This commit
reverses the order to ensure that when nodes leave the cluster that the auto-expand-replica
functionality only triggers after failing the shards on the removed nodes. This ensures that active
shards on other live nodes are not failed if the primary resided on a now dead node.
Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto-
expansion (removing replicas) only triggers in a follow-up step (but still same cluster state update).

Relates to #30456 and follow-up of #30423
  • Loading branch information
ywelsch committed May 14, 2018
1 parent 0359b55 commit 6e28967
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,24 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
}

protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
RoutingTable oldRoutingTable = oldState.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
ClusterState newState = buildResult(oldState, allocation);

logClusterHealthStateChange(
new ClusterStateHealth(oldState),
new ClusterStateHealth(newState),
reason
);

return newState;
}

private ClusterState buildResult(ClusterState oldState, RoutingAllocation allocation) {
final RoutingTable oldRoutingTable = oldState.routingTable();
final RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build();
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
final MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata

final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState)
.routingTable(newRoutingTable)
.metaData(newMetaData);
Expand All @@ -131,13 +144,7 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout
newStateBuilder.customs(customsBuilder.build());
}
}
final ClusterState newState = newStateBuilder.build();
logClusterHealthStateChange(
new ClusterStateHealth(oldState),
new ClusterStateHealth(newState),
reason
);
return newState;
return newStateBuilder.build();
}

// Used for testing
Expand Down Expand Up @@ -209,24 +216,23 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
* if needed.
*/
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());

// first, clear from the shards any node id they used to belong to that is now dead
deassociateDeadNodes(allocation);

if (reroute) {
reroute(allocation);
if (allocation.routingNodesChanged()) {
clusterState = buildResult(clusterState, allocation);
}

if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
if (reroute) {
return reroute(clusterState, reason);
} else {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
/**
* a task indicated that the current node should become master, if no current master is known
*/
private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_",
public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_",
new TransportAddress(TransportAddress.META_ADDRESS, 0),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
Expand All @@ -393,7 +393,7 @@ public String toString() {
* a task that is used to signal the election is stopped and we should process pending joins.
* it may be use in combination with {@link #BECOME_MASTER_TASK}
*/
private static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,36 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.isIn;

public class AutoExpandReplicasTests extends ESTestCase {

Expand Down Expand Up @@ -72,4 +100,104 @@ public void testInvalidValues() {
}

}

private static final AtomicInteger nodeIdGenerator = new AtomicInteger();

protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
Version.CURRENT);
}

/**
* Checks that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on
* the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node.
* Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto-expansion (removing replicas) only
* triggers in a follow-up step.
*/
public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedException {
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);

try {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
int numDataNodes = randomIntBetween(3, 5);
List<DiscoveryNode> dataNodes = new ArrayList<>(numDataNodes);
for (int i = 0; i < numDataNodes; i++) {
dataNodes.add(createNode(DiscoveryNode.Role.DATA));
}
allNodes.addAll(dataNodes);
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));

CreateIndexRequest request = new CreateIndexRequest("index",
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build())
.waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex("index"));
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
logger.info(state);
state = cluster.applyStartedShards(state,
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
state = cluster.reroute(state, new ClusterRerouteRequest());
}

IndexShardRoutingTable preTable = state.routingTable().index("index").shard(0);
final Set<String> unchangedNodeIds;
final IndexShardRoutingTable postTable;

if (randomBoolean()) {
// simulate node removal
List<DiscoveryNode> nodesToRemove = randomSubsetOf(2, dataNodes);
unchangedNodeIds = dataNodes.stream().filter(n -> nodesToRemove.contains(n) == false)
.map(DiscoveryNode::getId).collect(Collectors.toSet());

state = cluster.removeNodes(state, nodesToRemove);
postTable = state.routingTable().index("index").shard(0);

assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted());
assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(isIn(preTable.getAllAllocationIds())));
} else {
// fake an election where conflicting nodes are removed and readded
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build();

List<DiscoveryNode> conflictingNodes = randomSubsetOf(2, dataNodes);
unchangedNodeIds = dataNodes.stream().filter(n -> conflictingNodes.contains(n) == false)
.map(DiscoveryNode::getId).collect(Collectors.toSet());

List<DiscoveryNode> nodesToAdd = conflictingNodes.stream()
.map(n -> new DiscoveryNode(n.getName(), n.getId(), buildNewFakeTransportAddress(), n.getAttributes(), n.getRoles(), n.getVersion()))
.collect(Collectors.toList());

if (randomBoolean()) {
nodesToAdd.add(createNode(DiscoveryNode.Role.DATA));
}

state = cluster.joinNodesAndBecomeMaster(state, nodesToAdd);
postTable = state.routingTable().index("index").shard(0);
}

Set<String> unchangedAllocationIds = preTable.getShards().stream().filter(shr -> unchangedNodeIds.contains(shr.currentNodeId()))
.map(shr -> shr.allocationId().getId()).collect(Collectors.toSet());

assertThat(postTable.toString(), unchangedAllocationIds, everyItem(isIn(postTable.getAllAllocationIds())));

postTable.getShards().forEach(
shardRouting -> {
if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) {
assertTrue("Shard should be active: " + shardRouting, shardRouting.active());
}
}
);
} finally {
terminate(threadPool);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -232,6 +233,15 @@ public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> node
return runTasks(joinTaskExecutor, clusterState, nodes);
}

public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
List<DiscoveryNode> joinNodes = new ArrayList<>();
joinNodes.add(NodeJoinController.BECOME_MASTER_TASK);
joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK);
joinNodes.addAll(nodes);

return runTasks(joinTaskExecutor, clusterState, joinNodes);
}

public ClusterState removeNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
return runTasks(nodeRemovalExecutor, clusterState, nodes.stream()
.map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList()));
Expand Down

0 comments on commit 6e28967

Please sign in to comment.