From fc281a831c8a690b1e83a44a74cddc995a03f49e Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Mon, 8 Nov 2021 20:14:14 -0800 Subject: [PATCH] Adding cluster level setting for iterating primary first shards Signed-off-by: Ankit Jain --- .../cluster/routing/RoutingNodes.java | 109 +++++++++++------- .../allocator/BalancedShardsAllocator.java | 22 +++- .../common/settings/ClusterSettings.java | 1 + .../cluster/routing/RoutingNodeTests.java | 4 + 4 files changed, 90 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 35fc0f1e8f058..177745b05e0e9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -1281,63 +1281,88 @@ private void ensureMutable() { * The iterator then resumes on the first node by returning the second shard and continues until all shards from * all the nodes have been returned. */ - public Iterator nodeInterleavedShardIterator() { + public Iterator nodeInterleavedShardIterator(boolean movePrimaryFirst) { final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { queue.add(entry.getValue().copyShards().iterator()); } - return new Iterator() { - private Queue replicaShards = new ArrayDeque<>(); - private Queue> replicaIterators = new ArrayDeque<>(); - - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { + if (movePrimaryFirst) { + return new Iterator() { + private Queue replicaShards = new ArrayDeque<>(); + private Queue> replicaIterators = new ArrayDeque<>(); + + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); + } + if (!replicaShards.isEmpty()) { return true; } - queue.poll(); - } - if (!replicaShards.isEmpty()) { - return true; + while (!replicaIterators.isEmpty()) { + if (replicaIterators.peek().hasNext()) { + return true; + } + replicaIterators.poll(); + } + return false; } - while (!replicaIterators.isEmpty()) { - if (replicaIterators.peek().hasNext()) { - return true; + + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + while (!queue.isEmpty()) { + Iterator iter = queue.poll(); + if (iter.hasNext()) { + ShardRouting result = iter.next(); + if (result.primary()) { + queue.offer(iter); + return result; + } + replicaShards.offer(result); + replicaIterators.offer(iter); + } + } + if (!replicaShards.isEmpty()) { + return replicaShards.poll(); } - replicaIterators.poll(); + Iterator replicaIterator = replicaIterators.poll(); + ShardRouting replicaShard = replicaIterator.next(); + replicaIterators.offer(replicaIterator); + return replicaShard; } - return false; - } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); + public void remove() { + throw new UnsupportedOperationException(); } - while (!queue.isEmpty()) { - Iterator iter = queue.poll(); - if (iter.hasNext()) { - ShardRouting result = iter.next(); - if (result.primary()) { - queue.offer(iter); - return result; + }; + } else { + return new Iterator() { + @Override + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; } - replicaShards.offer(result); - replicaIterators.offer(iter); + queue.poll(); } + return false; } - if (!replicaShards.isEmpty()) { - return replicaShards.poll(); - } - Iterator replicaIterator = replicaIterators.poll(); - ShardRouting replicaShard = replicaIterator.next(); - replicaIterators.offer(replicaIterator); - return replicaShard; - } - public void remove() { - throw new UnsupportedOperationException(); - } - }; + @Override + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + Iterator iter = queue.poll(); + queue.offer(iter); + return iter.next(); + } + }; + } } private static final class Recoveries { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 93c9df71656f9..1cc6df4444d21 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -109,6 +109,12 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting( + "cluster.routing.allocation.move.primary_first", + false, + Property.Dynamic, + Property.NodeScope + ); public static final Setting THRESHOLD_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.threshold", 1.0f, @@ -117,6 +123,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -128,10 +135,15 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } + private void setMovePrimaryFirst(boolean movePrimaryFirst) { + this.movePrimaryFirst = movePrimaryFirst; + } + private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } @@ -146,7 +158,7 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); @@ -154,7 +166,7 @@ public void allocate(RoutingAllocation allocation) { @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { @@ -283,6 +295,7 @@ public static class Balancer { private final Map nodes; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + private final boolean movePrimaryFirst; private final WeightFunction weight; private final float threshold; @@ -291,9 +304,10 @@ public static class Balancer { private final NodeSorter sorter; private final Set inEligibleTargetNode; - public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { + public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) { this.logger = logger; this.allocation = allocation; + this.movePrimaryFirst = movePrimaryFirst; this.weight = weight; this.threshold = threshold; this.routingNodes = allocation.routingNodes(); @@ -725,7 +739,7 @@ public void moveShards() { for (ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); } - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { // Verify if the cluster concurrent recoveries have been reached. if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { logger.info( diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7a189ebc261a6..436ff102ecd54 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -228,6 +228,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java index cc5d1f0980d20..4b3b02036b724 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java @@ -97,12 +97,16 @@ public void testPrimaryFirstIterator() { true, ShardRoutingState.RELOCATING ); + ShardRouting initializingShard5 = TestShardRouting.newShardRouting("test", 5, "node-1", true, ShardRoutingState.INITIALIZING); routingNode.add(initializingShard3); routingNode.add(relocatingShard4); + routingNode.add(initializingShard5); final Iterator shardRoutingIterator = routingNode.iterator(); assertTrue(shardRoutingIterator.hasNext()); assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4)); assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard5)); + assertTrue(shardRoutingIterator.hasNext()); assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0)); assertTrue(shardRoutingIterator.hasNext()); assertThat(shardRoutingIterator.next(), equalTo(initializingShard0));