diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 123d09246bb7b..1b1a9394ff306 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -34,6 +34,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.collect.Tuple; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards @@ -55,11 +57,87 @@ */ public class RoutingNode implements Iterable { + static class BucketedShards implements Iterable { + private final Tuple, LinkedHashMap> shardTuple; // LinkedHashMap to + // preserve order + + BucketedShards(LinkedHashMap primaryShards, LinkedHashMap replicaShards) { + this.shardTuple = new Tuple(primaryShards, replicaShards); + } + + public boolean isEmpty() { + return this.shardTuple.v1().isEmpty() && this.shardTuple.v2().isEmpty(); + } + + public int size() { + return this.shardTuple.v1().size() + this.shardTuple.v2().size(); + } + + public boolean containsKey(ShardId shardId) { + return this.shardTuple.v1().containsKey(shardId) || this.shardTuple.v2().containsKey(shardId); + } + + public ShardRouting get(ShardId shardId) { + if (this.shardTuple.v1().containsKey(shardId)) { + return this.shardTuple.v1().get(shardId); + } + return this.shardTuple.v2().get(shardId); + } + + public ShardRouting add(ShardRouting shardRouting) { + return put(shardRouting.shardId(), shardRouting); + } + + public ShardRouting put(ShardId shardId, ShardRouting shardRouting) { + ShardRouting ret; + if (shardRouting.primary()) { + ret = this.shardTuple.v1().put(shardId, shardRouting); + if (this.shardTuple.v2().containsKey(shardId)) { + ret = this.shardTuple.v2().remove(shardId); + } + } else { + ret = this.shardTuple.v2().put(shardId, shardRouting); + if (this.shardTuple.v1().containsKey(shardId)) { + ret = this.shardTuple.v1().remove(shardId); + } + } + + return ret; + } + + public ShardRouting remove(ShardId shardId) { + if (this.shardTuple.v1().containsKey(shardId)) { + return this.shardTuple.v1().remove(shardId); + } + return this.shardTuple.v2().remove(shardId); + } + + @Override + public Iterator iterator() { + final Iterator primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator(); + final Iterator replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return primaryIterator.hasNext() || replicaIterator.hasNext(); + } + + @Override + public ShardRouting next() { + if (primaryIterator.hasNext()) { + return primaryIterator.next(); + } + return replicaIterator.next(); + } + }; + } + } + private final String nodeId; private final DiscoveryNode node; - private final LinkedHashMap shards; // LinkedHashMap to preserve order + private final BucketedShards shards; private final LinkedHashSet initializingShards; @@ -67,44 +145,44 @@ public class RoutingNode implements Iterable { private final HashMap> shardsByIndex; - public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { - this(nodeId, node, buildShardRoutingMap(shards)); - } - - RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap shards) { + public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) { this.nodeId = nodeId; this.node = node; - this.shards = shards; + final LinkedHashMap primaryShards = new LinkedHashMap<>(); + final LinkedHashMap replicaShards = new LinkedHashMap<>(); + this.shards = new BucketedShards(primaryShards, replicaShards); this.relocatingShards = new LinkedHashSet<>(); this.initializingShards = new LinkedHashSet<>(); this.shardsByIndex = new LinkedHashMap<>(); - for (ShardRouting shardRouting : shards.values()) { + + for (ShardRouting shardRouting : shardRoutings) { if (shardRouting.initializing()) { initializingShards.add(shardRouting); } else if (shardRouting.relocating()) { relocatingShards.add(shardRouting); } shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting); - } - assert invariant(); - } - private static LinkedHashMap buildShardRoutingMap(ShardRouting... shardRoutings) { - final LinkedHashMap shards = new LinkedHashMap<>(); - for (ShardRouting shardRouting : shardRoutings) { - ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting); + ShardRouting previousValue; + if (shardRouting.primary()) { + previousValue = primaryShards.put(shardRouting.shardId(), shardRouting); + } else { + previousValue = replicaShards.put(shardRouting.shardId(), shardRouting); + } + if (previousValue != null) { throw new IllegalArgumentException( "Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node " ); } } - return shards; + + assert invariant(); } @Override public Iterator iterator() { - return Collections.unmodifiableCollection(shards.values()).iterator(); + return shards.iterator(); } /** @@ -139,7 +217,7 @@ public int size() { */ void add(ShardRouting shard) { assert invariant(); - if (shards.containsKey(shard.shardId())) { + if (shards.add(shard) != null) { throw new IllegalStateException( "Trying to add a shard " + shard.shardId() @@ -152,7 +230,6 @@ void add(ShardRouting shard) { + "]" ); } - shards.put(shard.shardId(), shard); if (shard.initializing()) { initializingShards.add(shard); @@ -322,7 +399,7 @@ public int numberOfOwningShardsForIndex(final Index index) { public String prettyPrint() { StringBuilder sb = new StringBuilder(); sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n"); - for (ShardRouting entry : shards.values()) { + for (ShardRouting entry : shards) { sb.append("--------").append(entry.shortSummary()).append('\n'); } return sb.toString(); @@ -345,7 +422,9 @@ public String toString() { } public List copyShards() { - return new ArrayList<>(shards.values()); + List result = new ArrayList<>(); + shards.forEach(result::add); + return result; } public boolean isEmpty() { @@ -355,23 +434,20 @@ public boolean isEmpty() { private boolean invariant() { // initializingShards must consistent with that in shards - Collection shardRoutingsInitializing = shards.values() - .stream() + Collection shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false) .filter(ShardRouting::initializing) .collect(Collectors.toList()); assert initializingShards.size() == shardRoutingsInitializing.size(); assert initializingShards.containsAll(shardRoutingsInitializing); // relocatingShards must consistent with that in shards - Collection shardRoutingsRelocating = shards.values() - .stream() + Collection shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false) .filter(ShardRouting::relocating) .collect(Collectors.toList()); assert relocatingShards.size() == shardRoutingsRelocating.size(); assert relocatingShards.containsAll(shardRoutingsRelocating); - final Map> shardRoutingsByIndex = shards.values() - .stream() + final Map> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false) .collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet())); assert shardRoutingsByIndex.equals(shardsByIndex); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index d81e7fa6e22d9..b5353382f06b8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -56,7 +56,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -108,10 +107,10 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); - Map> nodesToShards = new HashMap<>(); // fill in the nodeToShards with the "live" nodes for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { - nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order + String nodeId = cursor.value.getId(); + this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId))); } // fill in the inverse of node -> shards allocated @@ -125,27 +124,23 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // by the ShardId, as this is common for primary and replicas. // A replica Set might have one (and not more) replicas with the state of RELOCATING. if (shard.assignedToNode()) { - Map entries = nodesToShards.computeIfAbsent( + RoutingNode routingNode = this.nodesToShards.computeIfAbsent( shard.currentNodeId(), - k -> new LinkedHashMap<>() - ); // LinkedHashMap to preserve order - ShardRouting previousValue = entries.put(shard.shardId(), shard); - if (previousValue != null) { - throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node"); - } + k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId())) + ); + routingNode.add(shard); assignedShardsAdd(shard); if (shard.relocating()) { relocatingShards++; - // LinkedHashMap to preserve order. // Add the counterpart shard with relocatingNodeId reflecting the source from which // it's relocating from. - entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>()); + routingNode = nodesToShards.computeIfAbsent( + shard.relocatingNodeId(), + k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId())) + ); ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); addInitialRecovery(targetShardRouting, indexShard.primary); - previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting); - if (previousValue != null) { - throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node"); - } + routingNode.add(targetShardRouting); assignedShardsAdd(targetShardRouting); } else if (shard.initializing()) { if (shard.primary()) { @@ -160,10 +155,6 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { } } } - for (Map.Entry> entry : nodesToShards.entrySet()) { - String nodeId = entry.getKey(); - this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue())); - } } private void addRecovery(ShardRouting routing) { @@ -1289,37 +1280,97 @@ private void ensureMutable() { * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. * The iterator then resumes on the first node by returning the second shard and continues until all shards from * all the nodes have been returned. + * @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node + * @return iterator of shard routings */ - public Iterator nodeInterleavedShardIterator() { + public Iterator nodeInterleavedShardIterator(boolean movePrimaryFirst) { final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { queue.add(entry.getValue().copyShards().iterator()); } - return new Iterator() { - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { + if (movePrimaryFirst) { + return new Iterator() { + private Queue replicaShards = new ArrayDeque<>(); + private Queue> replicaIterators = new ArrayDeque<>(); + + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); + } + if (!replicaShards.isEmpty()) { return true; } - queue.poll(); + while (!replicaIterators.isEmpty()) { + if (replicaIterators.peek().hasNext()) { + return true; + } + replicaIterators.poll(); + } + return false; } - return false; - } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + while (!queue.isEmpty()) { + Iterator iter = queue.poll(); + if (iter.hasNext()) { + ShardRouting result = iter.next(); + if (result.primary()) { + queue.offer(iter); + return result; + } + replicaShards.offer(result); + replicaIterators.offer(iter); + } + } + if (!replicaShards.isEmpty()) { + return replicaShards.poll(); + } + Iterator replicaIterator = replicaIterators.poll(); + ShardRouting replicaShard = replicaIterator.next(); + replicaIterators.offer(replicaIterator); + + assert !replicaShard.primary(); + return replicaShard; } - Iterator iter = queue.poll(); - ShardRouting result = iter.next(); - queue.offer(iter); - return result; - } - public void remove() { - throw new UnsupportedOperationException(); - } - }; + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } else { + return new Iterator() { + @Override + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); + } + return false; + } + + @Override + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + Iterator iter = queue.poll(); + queue.offer(iter); + return iter.next(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } private static final class Recoveries { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 93c9df71656f9..b3a045af91952 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -109,6 +109,12 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting( + "cluster.routing.allocation.move.primary_first", + false, + Property.Dynamic, + Property.NodeScope + ); public static final Setting THRESHOLD_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.threshold", 1.0f, @@ -117,6 +123,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -128,10 +135,15 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } + private void setMovePrimaryFirst(boolean movePrimaryFirst) { + this.movePrimaryFirst = movePrimaryFirst; + } + private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } @@ -146,7 +158,7 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); @@ -154,7 +166,7 @@ public void allocate(RoutingAllocation allocation) { @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { @@ -283,6 +295,7 @@ public static class Balancer { private final Map nodes; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + private final boolean movePrimaryFirst; private final WeightFunction weight; private final float threshold; @@ -291,9 +304,10 @@ public static class Balancer { private final NodeSorter sorter; private final Set inEligibleTargetNode; - public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { + public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) { this.logger = logger; this.allocation = allocation; + this.movePrimaryFirst = movePrimaryFirst; this.weight = weight; this.threshold = threshold; this.routingNodes = allocation.routingNodes(); @@ -725,7 +739,8 @@ public void moveShards() { for (ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); } - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { + boolean primariesThrottled = false; + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { // Verify if the cluster concurrent recoveries have been reached. if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { logger.info( @@ -745,11 +760,23 @@ public void moveShards() { ShardRouting shardRouting = it.next(); + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled + if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { + logger.info( + "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + + "are being throttled. Skipping shard iteration" + ); + return; + } + // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard // is not being throttled. Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); if (canMoveAwayDecision.type() != Decision.Type.YES) { if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); + if (shardRouting.primary() && canMoveAwayDecision.type() == Type.THROTTLE) { + primariesThrottled = true; + } continue; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7a189ebc261a6..436ff102ecd54 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -228,6 +228,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java b/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java new file mode 100644 index 0000000000000..64e622888018f --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class MovePrimaryFirstTests extends OpenSearchIntegTestCase { + + protected String startDataOnlyNode(final String zone) { + final Settings settings = Settings.builder().put("node.attr.zone", zone).build(); + return internalCluster().startDataOnlyNode(settings); + } + + protected void createAndIndex(String index, int replicaCount, int shardCount) { + assertAcked( + prepareCreate( + index, + -1, + Settings.builder() + .put("number_of_shards", shardCount) + .put("number_of_replicas", replicaCount) + .put("max_result_window", 20000) + ) + ); + int startDocCountId = 0; + for (int i = 0; i < 10; i++) { + index(index, "_doc", Integer.toString(startDocCountId), "foo", "bar" + startDocCountId); + ++startDocCountId; + } + flushAndRefresh(index); + } + + /** + * Creates two nodes each in two zones and shuts down nodes in one zone + * after relocating half the number of shards. Since, primaries are relocated + * first, cluster should stay green as primary should have relocated + */ + public void testClusterGreenAfterPartialRelocation() throws InterruptedException { + internalCluster().startMasterOnlyNodes(1); + final String z1 = "zone-1", z2 = "zone-2"; + final int primaryShardCount = 100; + final String z1n1 = startDataOnlyNode(z1); + ensureGreen(); + createAndIndex("foo", 1, primaryShardCount); + ensureYellow(); + // Start second node in same zone only after yellow cluster to ensure + // that one gets all primaries and other all secondaries + final String z1n2 = startDataOnlyNode(z1); + ensureGreen(); + + // Enable cluster level setting for moving primaries first and keep new + // zone nodes excluded to prevent any shard relocation + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings( + Settings.builder().put("cluster.routing.allocation.move.primary_first", true).put("cluster.routing.allocation.exclude.zone", z2) + ); + client().admin().cluster().updateSettings(settingsRequest).actionGet(); + + final String z2n1 = startDataOnlyNode(z2); + final String z2n2 = startDataOnlyNode(z2); + + // Create cluster state listener to compute number of shards on new zone + // nodes before counting down the latch + final CountDownLatch primaryMoveLatch = new CountDownLatch(1); + final ClusterStateListener listener = event -> { + if (event.routingTableChanged()) { + final RoutingNodes routingNodes = event.state().getRoutingNodes(); + int startedz2n1 = 0; + int startedz2n2 = 0; + for (Iterator it = routingNodes.iterator(); it.hasNext();) { + RoutingNode routingNode = it.next(); + final String nodeName = routingNode.node().getName(); + if (nodeName.equals(z2n1)) { + startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED); + } else if (nodeName.equals(z2n2)) { + startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED); + } + } + if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) { + primaryMoveLatch.countDown(); + } + } + }; + internalCluster().clusterService().addListener(listener); + + // Exclude zone1 nodes for allocation and await latch count down + settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings(Settings.builder().put("cluster.routing.allocation.exclude.zone", z1)); + client().admin().cluster().updateSettings(settingsRequest); + primaryMoveLatch.await(); + + // Shutdown both nodes in zone and ensure cluster stays green + try { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n2)); + } catch (Exception e) {} + ensureGreen(); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java index 8451633710ce5..5bd5b7d9f6a67 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java @@ -41,6 +41,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.net.InetAddress; +import java.util.Iterator; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -86,6 +87,29 @@ public void testAdd() { assertThat(routingNode.getByShardId(new ShardId("test", IndexMetadata.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0)); } + public void testPrimaryFirstIterator() { + ShardRouting initializingShard3 = TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING); + ShardRouting relocatingShard4 = TestShardRouting.newShardRouting("test", 4, "node-1", "node-2", true, ShardRoutingState.RELOCATING); + ShardRouting initializingShard5 = TestShardRouting.newShardRouting("test", 5, "node-1", true, ShardRoutingState.INITIALIZING); + routingNode.add(initializingShard3); + routingNode.add(relocatingShard4); + routingNode.add(initializingShard5); + final Iterator shardRoutingIterator = routingNode.iterator(); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard5)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(relocatingShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard3)); + assertFalse(shardRoutingIterator.hasNext()); + } + public void testUpdate() { ShardRouting startedShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); ShardRouting startedShard1 = TestShardRouting.newShardRouting("test", 1, "node-1", "node-2", false, ShardRoutingState.RELOCATING); diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java new file mode 100644 index 0000000000000..3e9088d63cfb4 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing; + +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.common.settings.Settings; + +import java.util.Iterator; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class RoutingNodesTests extends OpenSearchAllocationTestCase { + private static final String TEST_INDEX_1 = "test1"; + private static final String TEST_INDEX_2 = "test2"; + private RoutingTable emptyRoutingTable; + private int numberOfShards; + private int numberOfReplicas; + private int shardsPerIndex; + private int totalNumberOfShards; + private static final Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build(); + private final AllocationService ALLOCATION_SERVICE = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries + .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) + .put( + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + Integer.MAX_VALUE + ) + .build() + ); + private ClusterState clusterState; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.numberOfShards = 5; + this.numberOfReplicas = 2; + this.shardsPerIndex = this.numberOfShards * (this.numberOfReplicas + 1); + this.totalNumberOfShards = this.shardsPerIndex * 2; + logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas); + this.emptyRoutingTable = new RoutingTable.Builder().build(); + Metadata metadata = Metadata.builder().put(createIndexMetadata(TEST_INDEX_1)).put(createIndexMetadata(TEST_INDEX_2)).build(); + + RoutingTable testRoutingTable = new RoutingTable.Builder().add( + new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_1).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_1)).build() + ) + .add( + new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_2).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_2)).build() + ) + .build(); + this.clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(testRoutingTable) + .build(); + } + + /** + * Puts primary shard index routings into initializing state + */ + private void initPrimaries() { + logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < this.numberOfReplicas + 1; i++) { + discoBuilder = discoBuilder.add(newNode("node" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build(); + ClusterState rerouteResult = ALLOCATION_SERVICE.reroute(clusterState, "reroute"); + assertThat(rerouteResult, not(equalTo(this.clusterState))); + this.clusterState = rerouteResult; + } + + /** + * Moves initializing shards into started state + */ + private void startInitializingShards(String index) { + clusterState = startInitializingShardsAndReroute(ALLOCATION_SERVICE, clusterState, index); + } + + private IndexMetadata.Builder createIndexMetadata(String indexName) { + return new IndexMetadata.Builder(indexName).settings(DEFAULT_SETTINGS) + .numberOfReplicas(this.numberOfReplicas) + .numberOfShards(this.numberOfShards); + } + + public void testInterleavedShardIterator() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Create primary shard count imbalance between two nodes + final RoutingNode node0 = this.clusterState.getRoutingNodes().node("node0"); + final RoutingNode node1 = this.clusterState.getRoutingNodes().node("node1"); + final List shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + for (ShardRouting routing : shardRoutingList) { + if (routing.primary()) { + node0.remove(routing); + ShardRouting swap = node1.getByShardId(routing.shardId()); + node0.add(swap); + node1.remove(swap); + node1.add(routing); + } + } + + // Get primary first shard iterator and assert primary shards are iterated over first + final Iterator iterator = this.clusterState.getRoutingNodes().nodeInterleavedShardIterator(true); + boolean iteratingPrimary = true; + int shardCount = 0; + while (iterator.hasNext()) { + final ShardRouting shard = iterator.next(); + if (iteratingPrimary) { + iteratingPrimary = shard.primary(); + } else { + assert shard.primary() == false; + } + shardCount++; + } + assert shardCount == this.totalNumberOfShards; + } +}