From e2e96a8b704aa2261864e26f2710e1639391d1dc Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 11:49:38 +0100 Subject: [PATCH 1/6] Simplify shardsWithState --- .../cluster/routing/RoutingNode.java | 93 +++++-------------- 1 file changed, 24 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index af6b27fef98f..5c72b816cbcb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -25,6 +25,9 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toCollection; /** * A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards @@ -215,25 +218,11 @@ void remove(ShardRouting shard) { * @return number of shards */ public int numberOfShardsWithState(ShardRoutingState... states) { - if (states.length == 1) { - if (states[0] == ShardRoutingState.INITIALIZING) { - return initializingShards.size(); - } else if (states[0] == ShardRoutingState.RELOCATING) { - return relocatingShards.size(); - } else if (states[0] == ShardRoutingState.STARTED) { - return startedShards.size(); - } - } + return Stream.of(states).mapToInt(this::numberOfShardsWithState).sum(); + } - int count = 0; - for (ShardRouting shardEntry : this) { - for (ShardRoutingState state : states) { - if (shardEntry.state() == state) { - count++; - } - } - } - return count; + public int numberOfShardsWithState(ShardRoutingState state) { + return getShardsWithState(state).size(); } /** @@ -242,20 +231,7 @@ public int numberOfShardsWithState(ShardRoutingState... states) { * @return List of shards */ public List shardsWithState(ShardRoutingState state) { - if (state == ShardRoutingState.INITIALIZING) { - return new ArrayList<>(initializingShards); - } else if (state == ShardRoutingState.RELOCATING) { - return new ArrayList<>(relocatingShards); - } else if (state == ShardRoutingState.STARTED) { - return new ArrayList<>(startedShards); - } - List shards = new ArrayList<>(); - for (ShardRouting shardEntry : this) { - if (shardEntry.state() == state) { - shards.add(shardEntry); - } - } - return shards; + return new ArrayList<>(getShardsWithState(state)); } private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; @@ -279,49 +255,28 @@ public ShardRouting[] started() { * @return a list of shards */ public List shardsWithState(String index, ShardRoutingState... states) { - List shards = new ArrayList<>(); - - if (states.length == 1) { - if (states[0] == ShardRoutingState.INITIALIZING) { - for (ShardRouting shardEntry : initializingShards) { - if (shardEntry.getIndexName().equals(index) == false) { - continue; - } - shards.add(shardEntry); - } - return shards; - } else if (states[0] == ShardRoutingState.RELOCATING) { - for (ShardRouting shardEntry : relocatingShards) { - if (shardEntry.getIndexName().equals(index) == false) { - continue; - } - shards.add(shardEntry); - } - return shards; - } else if (states[0] == ShardRoutingState.STARTED) { - for (ShardRouting shardEntry : startedShards) { - if (shardEntry.getIndexName().equals(index) == false) { - continue; - } - shards.add(shardEntry); - } - return shards; - } - } + return Stream.of(states).flatMap(state -> shardsWithState(index, state).stream()).collect(toCollection(ArrayList::new)); + } - for (ShardRouting shardEntry : this) { - if (shardEntry.getIndexName().equals(index) == false) { - continue; - } - for (ShardRoutingState state : states) { - if (shardEntry.state() == state) { - shards.add(shardEntry); - } + public List shardsWithState(String index, ShardRoutingState state) { + var shards = new ArrayList(); + for (ShardRouting shardEntry : getShardsWithState(state)) { + if (shardEntry.getIndexName().equals(index)) { + shards.add(shardEntry); } } return shards; } + private LinkedHashSet getShardsWithState(ShardRoutingState state) { + return switch (state) { + case UNASSIGNED -> throw new IllegalArgumentException("Unassigned shards are not linked to a routing node"); + case INITIALIZING -> initializingShards; + case STARTED -> startedShards; + case RELOCATING -> relocatingShards; + }; + } + /** * The number of shards on this node that will not be eventually relocated. */ From a49170c06af1abb80df1c6a2e8ba904c251c8dec Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 12:26:51 +0100 Subject: [PATCH 2/6] fix compilation --- .../xpack/watcher/WatcherIndexingListenerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index c682ca93488d..985a0948516e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -330,7 +330,7 @@ public void testClusterChangedWatchAliasChanged() throws Exception { boolean emptyShards = randomBoolean(); if (emptyShards) { - when(routingNode.shardsWithState(eq(newActiveWatchIndex), any())).thenReturn(Collections.emptyList()); + when(routingNode.shardsWithState(eq(newActiveWatchIndex), any(ShardRoutingState[].class))).thenReturn(Collections.emptyList()); } else { Index index = new Index(newActiveWatchIndex, "uuid"); ShardId shardId = new ShardId(index, 0); From 76b526954426562c0025fec42f9246801eaefd13 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 12:26:58 +0100 Subject: [PATCH 3/6] fix tests --- .../elasticsearch/cluster/routing/RoutingNodesHelper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java index 4387f36effa0..8e93625c3c3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java @@ -21,11 +21,12 @@ private RoutingNodesHelper() {} public static List shardsWithState(RoutingNodes routingNodes, ShardRoutingState state) { List shards = new ArrayList<>(); - for (RoutingNode routingNode : routingNodes) { - shards.addAll(routingNode.shardsWithState(state)); - } if (state == ShardRoutingState.UNASSIGNED) { routingNodes.unassigned().forEach(shards::add); + } else { + for (RoutingNode routingNode : routingNodes) { + shards.addAll(routingNode.shardsWithState(state)); + } } return shards; } @@ -64,7 +65,6 @@ public static RoutingNode routingNode(String nodeId, DiscoveryNode node, ShardRo for (ShardRouting shardRouting : shards) { routingNode.add(shardRouting); } - return routingNode; } } From a4017d242359dead671de218674798ba2f887b83 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 12:35:24 +0100 Subject: [PATCH 4/6] upd --- .../cluster/routing/RoutingNodesHelper.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java index 8e93625c3c3e..95420293e80c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/RoutingNodesHelper.java @@ -31,19 +31,19 @@ public static List shardsWithState(RoutingNodes routingNodes, Shar return shards; } - public static List shardsWithState(RoutingNodes routingNodes, String index, ShardRoutingState... state) { + public static List shardsWithState(RoutingNodes routingNodes, String index, ShardRoutingState... states) { List shards = new ArrayList<>(); - for (RoutingNode routingNode : routingNodes) { - shards.addAll(routingNode.shardsWithState(index, state)); - } - for (ShardRoutingState s : state) { - if (s == ShardRoutingState.UNASSIGNED) { + for (ShardRoutingState state : states) { + if (state == ShardRoutingState.UNASSIGNED) { for (ShardRouting unassignedShard : routingNodes.unassigned()) { if (unassignedShard.index().getName().equals(index)) { shards.add(unassignedShard); } } - break; + } else { + for (RoutingNode routingNode : routingNodes) { + shards.addAll(routingNode.shardsWithState(index, state)); + } } } return shards; From f0ac7c746084228c360b94060c9c2ca5b3ea80a3 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 13:55:41 +0100 Subject: [PATCH 5/6] rename --- .../org/elasticsearch/cluster/routing/RoutingNode.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 5c72b816cbcb..e4b83dc11067 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -222,7 +222,7 @@ public int numberOfShardsWithState(ShardRoutingState... states) { } public int numberOfShardsWithState(ShardRoutingState state) { - return getShardsWithState(state).size(); + return internalGetShardsWithState(state).size(); } /** @@ -231,7 +231,7 @@ public int numberOfShardsWithState(ShardRoutingState state) { * @return List of shards */ public List shardsWithState(ShardRoutingState state) { - return new ArrayList<>(getShardsWithState(state)); + return new ArrayList<>(internalGetShardsWithState(state)); } private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; @@ -260,7 +260,7 @@ public List shardsWithState(String index, ShardRoutingState... sta public List shardsWithState(String index, ShardRoutingState state) { var shards = new ArrayList(); - for (ShardRouting shardEntry : getShardsWithState(state)) { + for (ShardRouting shardEntry : internalGetShardsWithState(state)) { if (shardEntry.getIndexName().equals(index)) { shards.add(shardEntry); } @@ -268,7 +268,7 @@ public List shardsWithState(String index, ShardRoutingState state) return shards; } - private LinkedHashSet getShardsWithState(ShardRoutingState state) { + private LinkedHashSet internalGetShardsWithState(ShardRoutingState state) { return switch (state) { case UNASSIGNED -> throw new IllegalArgumentException("Unassigned shards are not linked to a routing node"); case INITIALIZING -> initializingShards; From e2742c509b40dbbe66f799b08fcf2835e1d1eccc Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Nov 2022 14:10:49 +0100 Subject: [PATCH 6/6] inline --- .../cluster/routing/RoutingNode.java | 6 +---- .../cluster/routing/RoutingNodeTests.java | 1 - .../allocation/FailedShardsRoutingTests.java | 25 ++++++++++++------- .../TenShardsOneReplicaRoutingTests.java | 10 ++++++-- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index e4b83dc11067..399618de90e2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -214,13 +214,9 @@ void remove(ShardRouting shard) { /** * Determine the number of shards with a specific state - * @param states set of states which should be counted + * @param state which should be counted * @return number of shards */ - public int numberOfShardsWithState(ShardRoutingState... states) { - return Stream.of(states).mapToInt(this::numberOfShardsWithState).sum(); - } - public int numberOfShardsWithState(ShardRoutingState state) { return internalGetShardsWithState(state).size(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java index f78c0ff3fe6b..c40f95f384b2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java @@ -96,7 +96,6 @@ public void testRemove() { } public void testNumberOfShardsWithState() { - assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED), equalTo(2)); assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1)); assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 86ed2badab0e..5db2ed8de77f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -41,7 +41,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -507,10 +506,14 @@ public void testRebalanceFailure() { RoutingNodes routingNodes = clusterState.getRoutingNodes(); assertThat(clusterState.routingTable().index("test").size(), equalTo(2)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3)); + assertThat( + routingNodes.node("node1").numberOfShardsWithState(STARTED) + routingNodes.node("node1").numberOfShardsWithState(RELOCATING), + equalTo(2) + ); + assertThat( + routingNodes.node("node2").numberOfShardsWithState(STARTED) + routingNodes.node("node2").numberOfShardsWithState(RELOCATING), + equalTo(2) + ); assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); logger.info("Fail the shards on node 3"); @@ -521,10 +524,14 @@ public void testRebalanceFailure() { routingNodes = clusterState.getRoutingNodes(); assertThat(clusterState.routingTable().index("test").size(), equalTo(2)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3)); + assertThat( + routingNodes.node("node1").numberOfShardsWithState(STARTED) + routingNodes.node("node1").numberOfShardsWithState(RELOCATING), + equalTo(2) + ); + assertThat( + routingNodes.node("node2").numberOfShardsWithState(STARTED) + routingNodes.node("node2").numberOfShardsWithState(RELOCATING), + equalTo(2) + ); if (strategy.isBalancedShardsAllocator()) { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index f03683dd3675..e1134699db62 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -143,10 +143,16 @@ public void testSingleIndexFirstStartPrimaryThenBackups() { routingNodes = clusterState.getRoutingNodes(); assertThat(clusterState.routingTable().index("test").size(), equalTo(10)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(10)); assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(10)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(10)); + assertThat( + routingNodes.node("node1").numberOfShardsWithState(STARTED) + routingNodes.node("node1").numberOfShardsWithState(RELOCATING), + equalTo(10) + ); assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(10)); + assertThat( + routingNodes.node("node2").numberOfShardsWithState(STARTED) + routingNodes.node("node2").numberOfShardsWithState(RELOCATING), + equalTo(10) + ); assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(6)); logger.info("Start the shards on node 3");