Skip to content

Commit

Permalink
Adding cluster level setting for iterating primary first shards
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Jain <[email protected]>
  • Loading branch information
jainankitk committed Nov 9, 2021
1 parent 8e7c453 commit fc281a8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 46 deletions.
109 changes: 67 additions & 42 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -1281,63 +1281,88 @@ private void ensureMutable() {
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator() {
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
if (movePrimaryFirst) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return true;
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
}
return false;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
replicaIterators.poll();
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);
return replicaShard;
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
public void remove() {
throw new UnsupportedOperationException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
};
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
queue.poll();
}
return false;
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
}
};
@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}
};
}
}

private static final class Recoveries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.Dynamic,
Property.NodeScope
);
public static final Setting<Boolean> SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting(
"cluster.routing.allocation.move.primary_first",
false,
Property.Dynamic,
Property.NodeScope
);
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -117,6 +123,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

private volatile boolean movePrimaryFirst;
private volatile WeightFunction weightFunction;
private volatile float threshold;

Expand All @@ -128,10 +135,15 @@ public BalancedShardsAllocator(Settings settings) {
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}

private void setMovePrimaryFirst(boolean movePrimaryFirst) {
this.movePrimaryFirst = movePrimaryFirst;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand All @@ -146,15 +158,15 @@ public void allocate(RoutingAllocation allocation) {
failAllocationOfNewPrimaries(allocation);
return;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
}

@Override
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
if (shard.unassigned()) {
Expand Down Expand Up @@ -283,6 +295,7 @@ public static class Balancer {
private final Map<String, ModelNode> nodes;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
private final boolean movePrimaryFirst;
private final WeightFunction weight;

private final float threshold;
Expand All @@ -291,9 +304,10 @@ public static class Balancer {
private final NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;

public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) {
this.logger = logger;
this.allocation = allocation;
this.movePrimaryFirst = movePrimaryFirst;
this.weight = weight;
this.threshold = threshold;
this.routingNodes = allocation.routingNodes();
Expand Down Expand Up @@ -725,7 +739,7 @@ public void moveShards() {
for (ModelNode currentNode : sorter.modelNodes) {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
}
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) {
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) {
// Verify if the cluster concurrent recoveries have been reached.
if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public void apply(Settings value, Settings current, Settings previous) {
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,16 @@ public void testPrimaryFirstIterator() {
true,
ShardRoutingState.RELOCATING
);
ShardRouting initializingShard5 = TestShardRouting.newShardRouting("test", 5, "node-1", true, ShardRoutingState.INITIALIZING);
routingNode.add(initializingShard3);
routingNode.add(relocatingShard4);
routingNode.add(initializingShard5);
final Iterator<ShardRouting> shardRoutingIterator = routingNode.iterator();
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(initializingShard5));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(initializingShard0));
Expand Down

0 comments on commit fc281a8

Please sign in to comment.