Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Setting to adjust the primary constraint weights #16471

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) {
this.constraints.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight);
return params.weight(constraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap;

/**
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
Expand Down Expand Up @@ -44,11 +44,13 @@ static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;
private long PrimaryConstraintThreshold;

ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
this.balancer = balancer;
this.node = node;
this.index = index;
this.PrimaryConstraintThreshold = primaryConstraintThreshold;
}

public ShardsBalancer getBalancer() {
Expand All @@ -75,9 +77,12 @@ public String getIndex() {
*/
public long weight(Map<String, Constraint> constraints) {
long totalConstraintWeight = 0;
for (Constraint constraint : constraints.values()) {
for (Map.Entry<String, Constraint> entry : constraints.entrySet()) {
String key = entry.getKey();
Constraint constraint = entry.getValue();
if (constraint.test(this)) {
totalConstraintWeight += CONSTRAINT_WEIGHT;
double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold);
totalConstraintWeight += weight;
}
}
return totalConstraintWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,14 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
return primaryShardCount >= allowedPrimaryShardCount;
};
}

public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) {
switch (key) {
case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID:
case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID:
return primaryConstraintWeight;
default:
return CONSTRAINT_WEIGHT;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) {
this.constraints.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold);
return params.weight(constraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Long> PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting(
"cluster.routing.allocation.primary_constraint.threshold",
10,
0,
Property.Dynamic,
Property.NodeScope
);
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved

/**
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
Expand Down Expand Up @@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile long primaryConstraintThreshold;

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
Expand All @@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
Expand All @@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}
Expand Down Expand Up @@ -294,7 +305,12 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan
}

private void updateWeightFunction() {
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
weightFunction = new WeightFunction(
this.indexBalanceFactor,
this.shardBalanceFactor,
this.preferPrimaryShardRebalanceBuffer,
this.primaryConstraintThreshold
);
}

/**
Expand All @@ -317,6 +333,11 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setPrimaryConstraintThresholdSetting(long threshold) {
this.primaryConstraintThreshold = threshold;
this.weightFunction.updatePrimaryConstraintThreshold(threshold);
}

private void setAllocatorTimeout(TimeValue allocatorTimeout) {
this.allocatorTimeout = allocatorTimeout;
}
Expand Down Expand Up @@ -489,10 +510,11 @@ static class WeightFunction {
private final float shardBalance;
private final float theta0;
private final float theta1;
private long primaryConstraintThreshold;
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;

WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
Expand All @@ -501,6 +523,7 @@ static class WeightFunction {
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.primaryConstraintThreshold = primaryConstraintThreshold;
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints();
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
Expand All @@ -510,12 +533,12 @@ static class WeightFunction {

public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold);
}

public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + rebalanceConstraints.weight(balancer, node, index);
return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold);
}

float weight(ShardsBalancer balancer, ModelNode node, String index) {
Expand All @@ -531,6 +554,10 @@ void updateAllocationConstraint(String constraint, boolean enable) {
void updateRebalanceConstraint(String constraint, boolean add) {
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add);
}

void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) {
this.primaryConstraintThreshold = primaryConstraintThreshold;
}
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

public class AllocationConstraintsTests extends OpenSearchAllocationTestCase {

long constraintWeight = 20L;

public void testSettings() {
Settings.Builder settings = Settings.builder();
ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Expand Down Expand Up @@ -69,7 +71,7 @@ public void testIndexShardsPerNodeConstraint() {
when(node.getNodeId()).thenReturn("test-node");

long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
assertEquals(expectedWeight, constraints.weight(balancer, node, "index"));
assertEquals(expectedWeight, constraints.weight(balancer, node, "index", constraintWeight));

}

Expand All @@ -91,14 +93,14 @@ public void testPerIndexPrimaryShardsConstraint() {
when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount);
when(node.getNodeId()).thenReturn("test-node");

assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));

perIndexPrimaryShardCount = 2;
when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount);
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName, constraintWeight));

constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
}

/**
Expand All @@ -118,14 +120,14 @@ public void testGlobalPrimaryShardsConstraint() {
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
when(node.getNodeId()).thenReturn("test-node");

assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));

primaryShardCount = 3;
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));

constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
}

/**
Expand All @@ -150,22 +152,22 @@ public void testPrimaryShardsConstraints() {
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
when(node.getNodeId()).thenReturn("test-node");

assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));

// breaching global primary shard count but not per index primary shard count
primaryShardCount = 5;
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));

// when per index primary shard count constraint is also breached
perIndexPrimaryShardCount = 3;
when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount);
assertEquals(2 * CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
assertEquals(CONSTRAINT_WEIGHT + constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));

// disable both constraints
constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
assertEquals(0, constraints.weight(balancer, node, indexName));
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
}

/**
Expand Down Expand Up @@ -202,8 +204,8 @@ public void testAllConstraints() {

long expectedWeight = (shardCount >= (int) Math.ceil(avgPerIndexShardsPerNode)) ? CONSTRAINT_WEIGHT : 0;
expectedWeight += perIndexPrimaryShardCount > (int) Math.ceil(avgPerIndexPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
assertEquals(expectedWeight, constraints.weight(balancer, node, indexName));
expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? constraintWeight : 0;
assertEquals(expectedWeight, constraints.weight(balancer, node, indexName, constraintWeight));
}

}
Loading