Skip to content

Commit

Permalink
Validate routing commands using updated routing state (#42066)
Browse files Browse the repository at this point in the history
When multiple commands are called in sequence, fetch shards
from mutable, up-to-date routing nodes to ensure each command's
changes are visible to subsequent commands.

This addresses an issue uncovered during work on #41050.
  • Loading branch information
vigyasharma authored and DaveCTurner committed May 28, 2019
1 parent aea600f commit 130c832
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {

ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -35,6 +34,7 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -101,20 +101,34 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting primaryShardRouting;
try {
primaryShardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (primaryShardRouting.unassigned()) {

ShardRouting primaryShardRouting = null;
for (RoutingNode node : allocation.routingNodes()) {
for (ShardRouting shard : node) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
primaryShardRouting = shard;
break;
}
}
}
if (primaryShardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation,
"trying to allocate a replica shard [" + index + "][" + shardId +
"], while corresponding primary shard is still unassigned");
}

List<ShardRouting> replicaShardRoutings =
allocation.routingTable().shardRoutingTable(index, shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
List<ShardRouting> replicaShardRoutings = new ArrayList<>();
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary() == false) {
replicaShardRoutings.add(shard);
}
}

ShardRouting shardRouting;
if (replicaShardRoutings.isEmpty()) {
return explainOrThrowRejectedCommand(explain, allocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {

ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,76 @@ public void testMoveShardFromNonDataNode() {
assertEquals("[move_allocation] can't move [test][0] from " + node2 + " to " + node1 +
": source [" + node2.getName() + "] is not a data node.", e.getMessage());
}

public void testConflictingCommandsInSingleRequest() {
AllocationService allocation = createAllocationService(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
.build());

final String index1 = "test1";
final String index2 = "test2";
final String index3 = "test3";
logger.info("--> building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(index1).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.put(IndexMetaData.builder(index2).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.put(IndexMetaData.builder(index3).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(index1))
.addAsRecovery(metaData.index(index2))
.addAsRecovery(metaData.index(index3))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();

final String node1 = "node1";
final String node2 = "node2";
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.add(newNode(node1))
.add(newNode(node2))
).build();
final ClusterState finalClusterState = allocation.reroute(clusterState, "reroute");

logger.info("--> allocating same index primary in multiple commands should fail");
assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(finalClusterState,
new AllocationCommands(
new AllocateStalePrimaryAllocationCommand(index1, 0, node1, true),
new AllocateStalePrimaryAllocationCommand(index1, 0, node2, true)
), false, false);
}).getMessage(), containsString("primary [" + index1 + "][0] is already assigned"));

assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(finalClusterState,
new AllocationCommands(
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node1, true),
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node2, true)
), false, false);
}).getMessage(), containsString("primary [" + index2 + "][0] is already assigned"));


clusterState = allocation.reroute(clusterState,
new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(index3, 0, node1, true)), false, false).getClusterState();
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

final ClusterState updatedClusterState = clusterState;
assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1));

logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated");
assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(updatedClusterState,
new AllocationCommands(
new AllocateReplicaAllocationCommand(index3, 0, node2),
new AllocateReplicaAllocationCommand(index3, 0, node2)
), false, false);
}).getMessage(), containsString("all copies of [" + index3 + "][0] are already assigned. Use the move allocation command instead"));
}
}

0 comments on commit 130c832

Please sign in to comment.