diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java index 859705b1bd8af..c95c8e30342af 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -54,19 +54,50 @@ public void enablePreferPrimaryBalance() { client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true") - ) + .setPersistentSettings(Settings.builder().put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), "true")) ); } + /** + * This test verifies that the overall primary balance is attained during allocation. This test verifies primary + * balance per index and across all indices is maintained. + * @throws Exception + */ + public void testGlobalPrimaryAllocation() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 1; + final int maxShardCount = 1; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(5, 10); + + final List nodeNames = new ArrayList<>(); + logger.info("--> Creating {} nodes", nodeCount); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + enablePreferPrimaryBalance(); + int shardCount, replicaCount; + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + replicaCount = randomIntBetween(0, maxReplicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); + ensureGreen(TimeValue.timeValueSeconds(60)); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(); + } + /** * This test verifies the happy path where primary shard allocation is balanced when multiple indices are created. * * This test in general passes without primary shard balance as well due to nature of allocation algorithm which * assigns all primary shards first followed by replica copies. */ - public void testBalancedPrimaryAllocation() throws Exception { + public void testPerIndexPrimaryAllocation() throws Exception { internalCluster().startClusterManagerOnlyNode(); final int maxReplicaCount = 2; final int maxShardCount = 5; @@ -213,4 +244,24 @@ private void verifyPerIndexPrimaryBalance() throws Exception { } }, 60, TimeUnit.SECONDS); } + + private void verifyPrimaryBalance() throws Exception { + assertBusy(() -> { + final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState(); + RoutingNodes nodes = currentState.getRoutingNodes(); + int totalPrimaryShards = 0; + for (ObjectObjectCursor index : currentState.getRoutingTable().indicesRouting()) { + totalPrimaryShards += index.value.primaryShardsActive(); + } + final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()); + for (RoutingNode node : nodes) { + final int primaryCount = node.shardsWithState(STARTED) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .size(); + assertTrue(primaryCount <= avgPrimaryShardsPerNode); + } + }, 60, TimeUnit.SECONDS); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 228ac5b504abc..e8ab0738c18da 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -10,37 +10,29 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Predicate; -import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID; -import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isPrimaryShardsPerIndexPerNodeBreached; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isIndexShardsPerNodeBreached; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached; /** - * Allocation constraints specify conditions which, if breached, reduce the - * priority of a node for receiving unassigned shard allocations. + * Allocation constraints specify conditions which, if breached, reduce the priority of a node for receiving unassigned + * shard allocations. Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by + * this constraint. * * @opensearch.internal */ public class AllocationConstraints { - - /** - * - * This constraint is only applied for unassigned shards to avoid overloading a newly added node. - * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint. - */ - public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint"; private Map constraints; public AllocationConstraints() { this.constraints = new HashMap<>(); - this.constraints.putIfAbsent( - INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, - new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached()) - ); - this.constraints.putIfAbsent( - PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, - new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached()) - ); + this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); + this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached())); } public void updateAllocationConstraint(String constraint, boolean enable) { @@ -51,26 +43,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); return params.weight(constraints); } - - /** - * Constraint to control number of shards of an index allocated on a single - * node. - * - * In current weight function implementation, when a node has significantly - * fewer shards than other nodes (e.g. during single new node addition or node - * replacement), its weight is much less than other nodes. All shard allocations - * at this time tend to land on the new node with skewed weight. This breaks - * index level balance in the cluster, by creating all shards of the same index - * on one node, often resulting in a hotspot on that node. - * - * This constraint is breached when balancer attempts to allocate more than - * average shards per index per node. - */ - public static Predicate isIndexShardsPerNodeBreached() { - return (params) -> { - int currIndexShardsOnNode = params.getNode().numShards(params.getIndex()); - int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex())); - return (currIndexShardsOnNode >= allowedIndexShardsPerNode); - }; - } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index 68be7dc770ca1..e9c3c0afcbe88 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -12,9 +12,10 @@ import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import java.util.Map; -import java.util.Objects; import java.util.function.Predicate; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT; + /** * Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or * re-balancing target used in {@link RebalanceConstraints} @@ -23,17 +24,11 @@ */ public class Constraint implements Predicate { - public final static long CONSTRAINT_WEIGHT = 1000000L; - - private String name; - private boolean enable; private Predicate predicate; - public Constraint(String name, Predicate constraintPredicate) { - this.name = name; + public Constraint(Predicate constraintPredicate) { this.predicate = constraintPredicate; - this.enable = false; } @Override @@ -41,27 +36,10 @@ public boolean test(ConstraintParams constraintParams) { return this.enable && predicate.test(constraintParams); } - public String getName() { - return name; - } - public void setEnable(boolean enable) { this.enable = enable; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Constraint that = (Constraint) o; - return name.equals(that.name); - } - - @Override - public int hashCode() { - return Objects.hash(name); - } - static class ConstraintParams { private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java new file mode 100644 index 0000000000000..f209e993518c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import java.util.function.Predicate; + +/** + * Defines different constraints definitions + * + * @opensearch.internal + */ +public class ConstraintTypes { + public final static long CONSTRAINT_WEIGHT = 1000000L; + + /** + * Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index + */ + public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint"; + + /** + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + */ + public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; + + /** + * Defines an index constraint which is breached when a node contains more than avg number of shards for an index + */ + public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint"; + + /** + * Constraint to control number of shards of an index allocated on a single + * node. + * + * In current weight function implementation, when a node has significantly + * fewer shards than other nodes (e.g. during single new node addition or node + * replacement), its weight is much less than other nodes. All shard allocations + * at this time tend to land on the new node with skewed weight. This breaks + * index level balance in the cluster, by creating all shards of the same index + * on one node, often resulting in a hotspot on that node. + * + * This constraint is breached when balancer attempts to allocate more than + * average shards per index per node. + */ + public static Predicate isIndexShardsPerNodeBreached() { + return (params) -> { + int currIndexShardsOnNode = params.getNode().numShards(params.getIndex()); + int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex())); + return (currIndexShardsOnNode >= allowedIndexShardsPerNode); + }; + } + + /** + * Defines a predicate which returns true when specific to an index, a node contains more than average number of primary + * shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight + * {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected + * as allocation or rebalancing target + */ + public static Predicate isPerIndexPrimaryShardsPerNodeBreached() { + return (params) -> { + int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex()); + int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex())); + return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount; + }; + } + + /** + * Defines a predicate which returns true when a node contains more than average number of primary shards. This + * constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT} + * is assigned to node resulting in lesser chances of node being selected as allocation target + */ + public static Predicate isPrimaryShardsPerNodeBreached() { + return (params) -> { + int primaryShardCount = params.getNode().numPrimaryShards(); + int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode()); + return primaryShardCount >= allowedPrimaryShardCount; + }; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index b619eb993894a..a4036ec47ec0e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -13,9 +13,9 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Predicate; -import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached; /** * Constraints applied during rebalancing round; specify conditions which, if breached, reduce the @@ -24,15 +24,12 @@ * @opensearch.internal */ public class RebalanceConstraints { - public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(); + private Map constraints; public RebalanceConstraints() { this.constraints = new HashMap<>(); - this.constraints.putIfAbsent( - PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, - new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached()) - ); + this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); } public void updateRebalanceConstraint(String constraint, boolean enable) { @@ -43,16 +40,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); return params.weight(constraints); } - - /** - * When primary balance is preferred, add node constraint of average primary shards per node to give the node a - * higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node - */ - public static Predicate isPrimaryShardsPerIndexPerNodeBreached() { - return (params) -> { - int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex()); - int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex())); - return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode; - }; - } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a59035dea9d7b..0ff0eeba7d394 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; +import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -59,11 +60,13 @@ import java.util.Map; import java.util.Set; -import static org.opensearch.cluster.routing.allocation.AllocationConstraints.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; /** * The {@link BalancedShardsAllocator} re-balances the nodes allocations - * within an cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set + * within a cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set * in the cluster update API that allows changes in real-time: *
  • cluster.routing.allocation.balance.shard - The shard balance defines the weight factor * for shards allocated on a {@link RoutingNode}
  • @@ -71,6 +74,7 @@ * of {@link org.opensearch.cluster.routing.ShardRouting}s per index allocated on a specific node *
  • cluster.routing.allocation.balance.threshold - A threshold to set the minimal optimization * value of operations that should be performed
  • + *
  • cluster.routing.allocation.balance.prefer_primary - Defines whether primary shard balance is desired
  • *
*

* These parameters are combined in a {@link WeightFunction} that allows calculation of node weights which @@ -115,12 +119,12 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); /** - * Prefer per index primary shard balance by using {@link RebalanceConstraints#isPrimaryShardsPerIndexPerNodeBreached()} - * constraint which is used during unassigned shard allocation {@link LocalShardsBalancer#allocateUnassigned()} and - * shard re-balance/relocation to a different node{@link LocalShardsBalancer#balance()} . + * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()} + * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation + * {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} . */ - public static final Setting PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE = Setting.boolSetting( + public static final Setting PREFER_PRIMARY_SHARD_BALANCE = Setting.boolSetting( "cluster.routing.allocation.balance.prefer_primary", false, Property.Dynamic, @@ -141,8 +145,8 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); - setPreferPrimaryShardBalance(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.get(settings)); - clusterSettings.addSettingsUpdateConsumer(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); + setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); @@ -156,10 +160,15 @@ private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } + /** + * When primary shards balance is desired, enable primary shard balancing constraints + * @param preferPrimaryShardBalance + */ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.preferPrimaryShardBalance = preferPrimaryShardBalance; - this.weightFunction.updateAllocationConstraint(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryShardBalance); - this.weightFunction.updateRebalanceConstraint(PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); } private void setThreshold(float threshold) { @@ -386,6 +395,10 @@ public int numPrimaryShards(String idx) { return index == null ? 0 : index.numPrimaryShards(); } + public int numPrimaryShards() { + return indices.values().stream().mapToInt(index -> index.numPrimaryShards()).sum(); + } + public int highestPrimary(String index) { ModelIndex idx = indices.get(index); if (idx != null) { @@ -431,7 +444,6 @@ public boolean containsShard(ShardRouting shard) { ModelIndex index = getIndex(shard.getIndexName()); return index == null ? false : index.containsShard(shard); } - } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index ae6b8fd39978e..cd1083e57e2e0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -66,6 +66,8 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float threshold; private final Metadata metadata; private final float avgShardsPerNode; + + private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; @@ -85,6 +87,9 @@ public LocalShardsBalancer( this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + avgPrimaryShardsPerNode = (float) (StreamSupport.stream(metadata.spliterator(), false) + .mapToInt(IndexMetadata::getNumberOfShards) + .sum()) / routingNodes.size(); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); @@ -111,6 +116,11 @@ public float avgPrimaryShardsPerNode(String index) { return ((float) metadata.index(index).getNumberOfShards()) / nodes.size(); } + @Override + public float avgPrimaryShardsPerNode() { + return avgPrimaryShardsPerNode; + } + /** * Returns the global average of shards per node */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index b74393e1eec4c..ef2dbd34644a7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -80,4 +80,11 @@ public float avgPrimaryShardsPerNode(String index) { return Float.MAX_VALUE; } + /** + * Returns the average of primary shards per node + */ + public float avgPrimaryShardsPerNode() { + return Float.MAX_VALUE; + } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 55346cf3ffd9c..4ea0ca7c3d77d 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -233,7 +233,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, - BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE, + BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index b2e4521811569..937d0dd34226f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -18,9 +18,10 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.routing.allocation.AllocationConstraints.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; -import static org.opensearch.cluster.routing.allocation.Constraint.CONSTRAINT_WEIGHT; -import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; public class AllocationConstraintsTests extends OpenSearchAllocationTestCase { @@ -36,7 +37,7 @@ public void testSettings() { settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold); - settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), true); + settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), true); service.applySettings(settings.build()); @@ -45,7 +46,7 @@ public void testSettings() { assertEquals(threshold, allocator.getThreshold(), 0.01); assertEquals(true, allocator.getPreferPrimaryBalance()); - settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), false); + settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), false); service.applySettings(settings.build()); assertEquals(false, allocator.getPreferPrimaryBalance()); } @@ -76,54 +77,133 @@ public void testIndexShardsPerNodeConstraint() { * Test constraint evaluation logic when with different values of ConstraintMode * for IndexShardPerNode constraint satisfied and breached. */ - public void testIndexPrimaryShardsPerNodeConstraint() { + public void testPerIndexPrimaryShardsConstraint() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); AllocationConstraints constraints = new AllocationConstraints(); - constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, true); + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + final String indexName = "test-index"; + int perIndexPrimaryShardCount = 1; + float avgPerIndexPrimaryShardsPerNode = 2f; + + when(balancer.avgPrimaryShardsPerNode(anyString())).thenReturn(avgPerIndexPrimaryShardsPerNode); + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + when(node.getNodeId()).thenReturn("test-node"); + + assertEquals(0, constraints.weight(balancer, node, indexName)); + + perIndexPrimaryShardCount = 3; + when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + assertEquals(0, constraints.weight(balancer, node, indexName)); + } + + /** + * Test constraint evaluation logic when per index primary shard count constraint is breached. + */ + public void testGlobalPrimaryShardsConstraint() { + ShardsBalancer balancer = mock(LocalShardsBalancer.class); + BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); + AllocationConstraints constraints = new AllocationConstraints(); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + + final String indexName = "test-index"; int primaryShardCount = 1; float avgPrimaryShardsPerNode = 2f; - when(balancer.avgPrimaryShardsPerNode(anyString())).thenReturn(avgPrimaryShardsPerNode); - when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); + when(balancer.avgPrimaryShardsPerNode()).thenReturn(avgPrimaryShardsPerNode); + when(node.numPrimaryShards()).thenReturn(primaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, "index")); + assertEquals(0, constraints.weight(balancer, node, indexName)); primaryShardCount = 3; - when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, "index")); + when(node.numPrimaryShards()).thenReturn(primaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + assertEquals(0, constraints.weight(balancer, node, indexName)); + } + + /** + * Test constraint evaluation logic when both per index and global primary shard count constraint is breached. + */ + public void testPrimaryShardsConstraints() { + ShardsBalancer balancer = mock(LocalShardsBalancer.class); + BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); + AllocationConstraints constraints = new AllocationConstraints(); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + + final String indexName = "test-index"; + int perIndexPrimaryShardCount = 1; + float avgPerIndexPrimaryShardCount = 2; + int primaryShardCount = 2; + float avgPrimaryShardsPerNode = 4; + + when(balancer.avgPrimaryShardsPerNode(indexName)).thenReturn(avgPerIndexPrimaryShardCount); + when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount); + when(balancer.avgPrimaryShardsPerNode()).thenReturn(avgPrimaryShardsPerNode); + when(node.numPrimaryShards()).thenReturn(primaryShardCount); + when(node.getNodeId()).thenReturn("test-node"); - constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, false); - assertEquals(0, constraints.weight(balancer, node, "index")); + assertEquals(0, constraints.weight(balancer, node, indexName)); + + // breaching global primary shard count but not per index primary shard count + primaryShardCount = 5; + when(node.numPrimaryShards()).thenReturn(primaryShardCount); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + // when per index primary shard count constraint is also breached + perIndexPrimaryShardCount = 3; + when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount); + assertEquals(2 * CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + + // disable both constraints + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); + assertEquals(0, constraints.weight(balancer, node, indexName)); } /** * Test constraint evaluation logic when with different values of ConstraintMode * for IndexShardPerNode constraint satisfied and breached. */ - public void testAllConstraint() { + public void testAllConstraints() { ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); AllocationConstraints constraints = new AllocationConstraints(); constraints.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); - constraints.updateAllocationConstraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, true); + constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, true); + final String indexName = "test-index"; int shardCount = randomIntBetween(1, 500); - int primaryShardCount = randomIntBetween(1, shardCount); - float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; - float avgPrimaryShardsPerNode = (random().nextFloat()) * avgShardsPerNode; + float avgPerIndexShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; - when(balancer.avgPrimaryShardsPerNode(anyString())).thenReturn(avgPrimaryShardsPerNode); - when(node.numPrimaryShards(anyString())).thenReturn(primaryShardCount); - when(balancer.avgShardsPerNode(anyString())).thenReturn(avgShardsPerNode); - when(node.numShards(anyString())).thenReturn(shardCount); + int perIndexPrimaryShardCount = randomIntBetween(1, shardCount); + float avgPerIndexPrimaryShardsPerNode = (random().nextFloat()) * avgPerIndexShardsPerNode; + + float avgPrimaryShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f; + int primaryShardsPerNode = randomIntBetween(1, shardCount); + + when(balancer.avgPrimaryShardsPerNode(indexName)).thenReturn(avgPerIndexPrimaryShardsPerNode); + when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount); + + when(balancer.avgPrimaryShardsPerNode()).thenReturn(avgPrimaryShardsPerNode); + when(node.numPrimaryShards()).thenReturn(primaryShardsPerNode); + + when(balancer.avgShardsPerNode(indexName)).thenReturn(avgPerIndexShardsPerNode); + when(node.numShards(indexName)).thenReturn(shardCount); when(node.getNodeId()).thenReturn("test-node"); - long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - expectedWeight += primaryShardCount > avgPrimaryShardsPerNode ? CONSTRAINT_WEIGHT : 0; - assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); + long expectedWeight = (shardCount >= avgPerIndexShardsPerNode) ? CONSTRAINT_WEIGHT : 0; + expectedWeight += perIndexPrimaryShardCount > avgPerIndexPrimaryShardsPerNode ? CONSTRAINT_WEIGHT : 0; + expectedWeight += primaryShardsPerNode >= avgPrimaryShardsPerNode ? CONSTRAINT_WEIGHT : 0; + assertEquals(expectedWeight, constraints.weight(balancer, node, indexName)); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index b472c043b2843..f5a418bc6a100 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -62,7 +62,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; @@ -71,6 +70,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; @@ -155,7 +155,7 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrima ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance); + settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); return settings; @@ -190,7 +190,7 @@ public void testPrimaryBalance() { } /** - * This test verifies primary shard balance is not attained without PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE setting. + * This test verifies primary shard balance is not attained without PREFER_PRIMARY_SHARD_BALANCE setting. */ public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { final int numberOfNodes = 5; @@ -206,7 +206,7 @@ public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { new TestGatewayAllocator() ); for (int i = 0; i < numberOfRuns; i++) { - ClusterState clusterState = initCluster(strategy, true, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + ClusterState clusterState = initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); clusterState = removeOneNode(clusterState, strategy); logger.info(ShardAllocations.printShardDistribution(clusterState)); try { @@ -220,7 +220,7 @@ public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { } /** - * This test verifies primary shard balance is attained with PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE setting. + * This test verifies primary shard balance is attained with PREFER_PRIMARY_SHARD_BALANCE setting. */ public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { final int numberOfNodes = 5; @@ -232,7 +232,7 @@ public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); for (int i = 0; i < numberOfRuns; i++) { - ClusterState clusterState = initCluster(strategy, true, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + ClusterState clusterState = initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); clusterState = removeOneNode(clusterState, strategy); logger.info(ShardAllocations.printShardDistribution(clusterState)); try { @@ -362,6 +362,26 @@ public void testPrimaryBalanceWithContrainstBreaching() { assertEquals("node0", replicaShards.get(0).currentNodeId()); } + /** + * This test verifies global balance by creating indices iteratively and verify primary shards do not pile up on one + * node. + * @throws Exception + */ + public void testGlobalPrimaryBalance() throws Exception { + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .build(); + clusterState = addNode(clusterState, strategy); + clusterState = addNode(clusterState, strategy); + + clusterState = addIndex(clusterState, strategy, "test-index1", 1, 1); + clusterState = addIndex(clusterState, strategy, "test-index2", 1, 1); + clusterState = addIndex(clusterState, strategy, "test-index3", 1, 1); + + logger.info(ShardAllocations.printShardDistribution(clusterState)); + verifyPrimaryBalance(clusterState); + } + /** * This test mimics a cluster state which can not be rebalanced due to * {@link org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider} @@ -502,7 +522,7 @@ public void testPrimaryBalance_NotSolved_2() { verifyPerIndexPrimaryBalance(clusterState); } - public void verifyPerIndexPrimaryBalance(ClusterState currentState) { + private void verifyPerIndexPrimaryBalance(ClusterState currentState) { RoutingNodes nodes = currentState.getRoutingNodes(); for (ObjectObjectCursor index : currentState.getRoutingTable().indicesRouting()) { final int totalPrimaryShards = index.value.primaryShardsActive(); @@ -519,6 +539,25 @@ public void verifyPerIndexPrimaryBalance(ClusterState currentState) { } } + private void verifyPrimaryBalance(ClusterState clusterState) throws Exception { + assertBusy(() -> { + RoutingNodes nodes = clusterState.getRoutingNodes(); + int totalPrimaryShards = 0; + for (ObjectObjectCursor index : clusterState.getRoutingTable().indicesRouting()) { + totalPrimaryShards += index.value.primaryShardsActive(); + } + final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / clusterState.getRoutingNodes().size()); + for (RoutingNode node : nodes) { + final int primaryCount = node.shardsWithState(STARTED) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .size(); + assertTrue(primaryCount <= avgPrimaryShardsPerNode); + } + }, 60, TimeUnit.SECONDS); + } + public void testShardBalance() { /* Tests balance over replicas only */ final float indexBalance = 0.0f; @@ -568,12 +607,37 @@ public void testShardBalance() { } private ClusterState initCluster(AllocationService strategy) { - return initCluster(strategy, false, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + return initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + } + + private ClusterState addIndex( + ClusterState clusterState, + AllocationService strategy, + String indexName, + int numberOfShards, + int numberOfReplicas + ) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable()); + + IndexMetadata.Builder index = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas); + + metadataBuilder = metadataBuilder.put(index); + routingTableBuilder.addAsNew(index.build()); + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + clusterState = strategy.reroute(clusterState, "index-created"); + return applyAllocationUntilNoChange(clusterState, strategy); } private ClusterState initCluster( AllocationService strategy, - boolean segrep, int numberOfIndices, int numberOfNodes, int numberOfShards, @@ -583,19 +647,14 @@ private ClusterState initCluster( RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (int i = 0; i < numberOfIndices; i++) { - Settings.Builder settingsBuilder = settings(Version.CURRENT); - if (segrep) { - settingsBuilder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); - } IndexMetadata.Builder index = IndexMetadata.builder("test" + i) - .settings(settingsBuilder) + .settings(settings(Version.CURRENT).build()) .numberOfShards(numberOfShards) .numberOfReplicas(numberOfReplicas); metadataBuilder = metadataBuilder.put(index); } Metadata metadata = metadataBuilder.build(); - for (ObjectCursor cursor : metadata.indices().values()) { routingTableBuilder.addAsNew(cursor.value); }