From 130c832e10738308fed8a1c35321ef722dd521e4 Mon Sep 17 00:00:00 2001 From: Vigya Sharma Date: Tue, 28 May 2019 20:12:46 +0530 Subject: [PATCH] Validate routing commands using updated routing state (#42066) When multiple commands are called in sequence, fetch shards from mutable, up-to-date routing nodes to ensure each command's changes are visible to subsequent commands. This addresses an issue uncovered during work on #41050. --- ...AllocateEmptyPrimaryAllocationCommand.java | 13 +++- .../AllocateReplicaAllocationCommand.java | 26 +++++-- ...AllocateStalePrimaryAllocationCommand.java | 13 +++- .../allocation/AllocationCommandsTests.java | 72 +++++++++++++++++++ 4 files changed, 112 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 4d037570dd266..2e3219e67c7ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -110,13 +110,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) return explainOrThrowMissingRoutingNode(allocation, explain, discoNode); } - final ShardRouting shardRouting; try { - shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); + allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); } catch (IndexNotFoundException | ShardNotFoundException e) { return explainOrThrowRejectedCommand(explain, allocation, e); } - if (shardRouting.unassigned() == false) { + + ShardRouting shardRouting = null; + for (ShardRouting shard : allocation.routingNodes().unassigned()) { + if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) { + shardRouting = shard; + break; + } + } + if (shardRouting == null) { return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned"); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java index 709681f2b2008..5e1bcd81bb5fa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -35,6 +34,7 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -101,20 +101,34 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) return explainOrThrowMissingRoutingNode(allocation, explain, discoNode); } - final ShardRouting primaryShardRouting; try { - primaryShardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); + allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); } catch (IndexNotFoundException | ShardNotFoundException e) { return explainOrThrowRejectedCommand(explain, allocation, e); } - if (primaryShardRouting.unassigned()) { + + ShardRouting primaryShardRouting = null; + for (RoutingNode node : allocation.routingNodes()) { + for (ShardRouting shard : node) { + if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) { + primaryShardRouting = shard; + break; + } + } + } + if (primaryShardRouting == null) { return explainOrThrowRejectedCommand(explain, allocation, "trying to allocate a replica shard [" + index + "][" + shardId + "], while corresponding primary shard is still unassigned"); } - List replicaShardRoutings = - allocation.routingTable().shardRoutingTable(index, shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED); + List replicaShardRoutings = new ArrayList<>(); + for (ShardRouting shard : allocation.routingNodes().unassigned()) { + if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary() == false) { + replicaShardRoutings.add(shard); + } + } + ShardRouting shardRouting; if (replicaShardRoutings.isEmpty()) { return explainOrThrowRejectedCommand(explain, allocation, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java index f4c9aba17d71e..7e645c2cfcb6f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java @@ -108,13 +108,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) return explainOrThrowMissingRoutingNode(allocation, explain, discoNode); } - final ShardRouting shardRouting; try { - shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); + allocation.routingTable().shardRoutingTable(index, shardId).primaryShard(); } catch (IndexNotFoundException | ShardNotFoundException e) { return explainOrThrowRejectedCommand(explain, allocation, e); } - if (shardRouting.unassigned() == false) { + + ShardRouting shardRouting = null; + for (ShardRouting shard : allocation.routingNodes().unassigned()) { + if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) { + shardRouting = shard; + break; + } + } + if (shardRouting == null) { return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index c966e3cac27dc..1405be54fd51e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -677,4 +677,76 @@ public void testMoveShardFromNonDataNode() { assertEquals("[move_allocation] can't move [test][0] from " + node2 + " to " + node1 + ": source [" + node2.getName() + "] is not a data node.", e.getMessage()); } + + public void testConflictingCommandsInSingleRequest() { + AllocationService allocation = createAllocationService(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .build()); + + final String index1 = "test1"; + final String index2 = "test2"; + final String index3 = "test3"; + logger.info("--> building initial routing table"); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(index1).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Collections.singleton("randomAllocID")) + .putInSyncAllocationIds(1, Collections.singleton("randomAllocID2"))) + .put(IndexMetaData.builder(index2).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Collections.singleton("randomAllocID")) + .putInSyncAllocationIds(1, Collections.singleton("randomAllocID2"))) + .put(IndexMetaData.builder(index3).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Collections.singleton("randomAllocID")) + .putInSyncAllocationIds(1, Collections.singleton("randomAllocID2"))) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(index1)) + .addAsRecovery(metaData.index(index2)) + .addAsRecovery(metaData.index(index3)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable).build(); + + final String node1 = "node1"; + final String node2 = "node2"; + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(newNode(node1)) + .add(newNode(node2)) + ).build(); + final ClusterState finalClusterState = allocation.reroute(clusterState, "reroute"); + + logger.info("--> allocating same index primary in multiple commands should fail"); + assertThat(expectThrows(IllegalArgumentException.class, () -> { + allocation.reroute(finalClusterState, + new AllocationCommands( + new AllocateStalePrimaryAllocationCommand(index1, 0, node1, true), + new AllocateStalePrimaryAllocationCommand(index1, 0, node2, true) + ), false, false); + }).getMessage(), containsString("primary [" + index1 + "][0] is already assigned")); + + assertThat(expectThrows(IllegalArgumentException.class, () -> { + allocation.reroute(finalClusterState, + new AllocationCommands( + new AllocateEmptyPrimaryAllocationCommand(index2, 0, node1, true), + new AllocateEmptyPrimaryAllocationCommand(index2, 0, node2, true) + ), false, false); + }).getMessage(), containsString("primary [" + index2 + "][0] is already assigned")); + + + clusterState = allocation.reroute(clusterState, + new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(index3, 0, node1, true)), false, false).getClusterState(); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + + final ClusterState updatedClusterState = clusterState; + assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1)); + + logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated"); + assertThat(expectThrows(IllegalArgumentException.class, () -> { + allocation.reroute(updatedClusterState, + new AllocationCommands( + new AllocateReplicaAllocationCommand(index3, 0, node2), + new AllocateReplicaAllocationCommand(index3, 0, node2) + ), false, false); + }).getMessage(), containsString("all copies of [" + index3 + "][0] are already assigned. Use the move allocation command instead")); + } }