Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Prioritize replica shard movement during shard relocation #8875

Merged
merged 6 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195)
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))
- Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737)
- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,100 +1310,131 @@ private void ensureMutable() {
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
* @return iterator of shard routings
* Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)}
* @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
private Iterator<ShardRouting> buildIteratorForMovementStrategy(boolean primaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (movePrimaryFirst) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> shardRoutings = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> shardIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
queue.poll();
}
if (!shardRoutings.isEmpty()) {
return true;
}
while (!shardIterators.isEmpty()) {
if (shardIterators.peek().hasNext()) {
return true;
}
return false;
shardIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (primaryFirst) {
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
shardRoutings.offer(result);
shardIterators.offer(iter);
}
} else {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
shardRoutings.offer(result);
shardIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);

assert !replicaShard.primary();
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
if (!shardRoutings.isEmpty()) {
return shardRoutings.poll();
}
};
Iterator<ShardRouting> replicaIterator = shardIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
shardIterators.offer(replicaIterator);

assert replicaShard.primary() != primaryFirst;
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
return buildIteratorForMovementStrategy(true);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) {
return buildIteratorForMovementStrategy(false);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
queue.poll();
return false;
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.Locale;

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
*
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*
* @opensearch.internal
*/
public enum ShardMovementStrategy {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
/**
* default behavior in which order of shard movement doesn't matter.
*/
NO_PREFERENCE,

/**
* primary shards are moved first
*/
PRIMARY_FIRST,

/**
* replica shards are moved first
*/
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
Expand Down Expand Up @@ -107,8 +108,22 @@ public class BalancedShardsAllocator implements ShardsAllocator {
"cluster.routing.allocation.move.primary_first",
false,
Property.Dynamic,
Property.NodeScope,
Property.Deprecated
);

/**
* Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()}
* Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING.
*/
public static final Setting<ShardMovementStrategy> SHARD_MOVEMENT_STRATEGY_SETTING = new Setting<ShardMovementStrategy>(
"cluster.routing.allocation.shard_movement_strategy",
ShardMovementStrategy.NO_PREFERENCE.toString(),
ShardMovementStrategy::parse,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -131,6 +146,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
Expand All @@ -145,8 +161,10 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}
Expand All @@ -155,6 +173,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) {
this.movePrimaryFirst = movePrimaryFirst;
}

private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) {
this.shardMovementStrategy = shardMovementStrategy;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand Down Expand Up @@ -184,6 +206,7 @@ public void allocate(RoutingAllocation allocation) {
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand All @@ -205,6 +228,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand Down Expand Up @@ -456,11 +480,12 @@ public Balancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance);
super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance);
}
}

Expand Down
Loading