Skip to content

Commit

Permalink
[Segment Replication] Add global primary shard balance constraint dur…
Browse files Browse the repository at this point in the history
…ing allocation (#6643)

* [Segment Replication] Add global primary shard balance constraint during allocation

Signed-off-by: Suraj Singh <[email protected]>

* Add javadoc

Signed-off-by: Suraj Singh <[email protected]>

* Fix testAllConstraints test to include all constraints

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Mar 15, 2023
1 parent 2ebd644 commit ad823b6
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,50 @@ public void enablePreferPrimaryBalance() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true")
)
.setPersistentSettings(Settings.builder().put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), "true"))
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
* @throws Exception
*/
public void testGlobalPrimaryAllocation() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 1;
final int maxShardCount = 1;
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10);
final int numberOfIndices = randomIntBetween(5, 10);

final List<String> nodeNames = new ArrayList<>();
logger.info("--> Creating {} nodes", nodeCount);
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
enablePreferPrimaryBalance();
int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(0, maxReplicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount);
ensureGreen(TimeValue.timeValueSeconds(60));
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
}

/**
* This test verifies the happy path where primary shard allocation is balanced when multiple indices are created.
*
* This test in general passes without primary shard balance as well due to nature of allocation algorithm which
* assigns all primary shards first followed by replica copies.
*/
public void testBalancedPrimaryAllocation() throws Exception {
public void testPerIndexPrimaryAllocation() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 5;
Expand Down Expand Up @@ -213,4 +244,24 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
int totalPrimaryShards = 0;
for (ObjectObjectCursor<String, IndexRoutingTable> index : currentState.getRoutingTable().indicesRouting()) {
totalPrimaryShards += index.value.primaryShardsActive();
}
final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size());
for (RoutingNode node : nodes) {
final int primaryCount = node.shardsWithState(STARTED)
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
}
}, 60, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,29 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID;
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isPrimaryShardsPerIndexPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isIndexShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;

/**
* Allocation constraints specify conditions which, if breached, reduce the
* priority of a node for receiving unassigned shard allocations.
* Allocation constraints specify conditions which, if breached, reduce the priority of a node for receiving unassigned
* shard allocations. Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by
* this constraint.
*
* @opensearch.internal
*/
public class AllocationConstraints {

/**
*
* This constraint is only applied for unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint.
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint";
private Map<String, Constraint> constraints;

public AllocationConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached())
);
this.constraints.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
);
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached()));
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand All @@ -51,26 +43,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

/**
* Constraint to control number of shards of an index allocated on a single
* node.
*
* In current weight function implementation, when a node has significantly
* fewer shards than other nodes (e.g. during single new node addition or node
* replacement), its weight is much less than other nodes. All shard allocations
* at this time tend to land on the new node with skewed weight. This breaks
* index level balance in the cluster, by creating all shards of the same index
* on one node, often resulting in a hotspot on that node.
*
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;

/**
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
* re-balancing target used in {@link RebalanceConstraints}
Expand All @@ -23,45 +24,22 @@
*/
public class Constraint implements Predicate<Constraint.ConstraintParams> {

public final static long CONSTRAINT_WEIGHT = 1000000L;

private String name;

private boolean enable;
private Predicate<ConstraintParams> predicate;

public Constraint(String name, Predicate<ConstraintParams> constraintPredicate) {
this.name = name;
public Constraint(Predicate<ConstraintParams> constraintPredicate) {
this.predicate = constraintPredicate;
this.enable = false;
}

@Override
public boolean test(ConstraintParams constraintParams) {
return this.enable && predicate.test(constraintParams);
}

public String getName() {
return name;
}

public void setEnable(boolean enable) {
this.enable = enable;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Constraint that = (Constraint) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import java.util.function.Predicate;

/**
* Defines different constraints definitions
*
* @opensearch.internal
*/
public class ConstraintTypes {
public final static long CONSTRAINT_WEIGHT = 1000000L;

/**
* Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index
*/
public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
*/
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";

/**
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint";

/**
* Constraint to control number of shards of an index allocated on a single
* node.
*
* In current weight function implementation, when a node has significantly
* fewer shards than other nodes (e.g. during single new node addition or node
* replacement), its weight is much less than other nodes. All shard allocations
* at this time tend to land on the new node with skewed weight. This breaks
* index level balance in the cluster, by creating all shards of the same index
* on one node, often resulting in a hotspot on that node.
*
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}

/**
* Defines a predicate which returns true when specific to an index, a node contains more than average number of primary
* shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
* as allocation or rebalancing target
*/
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
return (params) -> {
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
};
}

/**
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation target
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached() {
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode());
return primaryShardCount >= allowedPrimaryShardCount;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;

/**
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
Expand All @@ -24,15 +24,12 @@
* @opensearch.internal
*/
public class RebalanceConstraints {
public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey();

private Map<String, Constraint> constraints;

public RebalanceConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
);
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
Expand All @@ -43,16 +40,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

/**
* When primary balance is preferred, add node constraint of average primary shards per node to give the node a
* higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerIndexPerNodeBreached() {
return (params) -> {
int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex());
int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode;
};
}
}
Loading

0 comments on commit ad823b6

Please sign in to comment.