Skip to content

Commit

Permalink
Schedule reroute after allocator timed out (opensearch-project#15565)
Browse files Browse the repository at this point in the history
* Schedule reroute after allocator timed out

Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Sep 4, 2024
1 parent ccae165 commit 7fa2edb
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -471,4 +472,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
shardsAllocator.setRerouteService(rerouteService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
Expand All @@ -49,12 +50,14 @@
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;
private RerouteService rerouteService;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down Expand Up @@ -231,6 +235,12 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

@Override
public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}

/**
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
*/
Expand Down Expand Up @@ -342,6 +352,7 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
scheduleRerouteIfAllocatorTimedOut();

final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
Expand Down Expand Up @@ -404,6 +415,20 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
}
}

private void scheduleRerouteIfAllocatorTimedOut() {
if (allocatorTimedOut()) {
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
)
);
}
}

/**
* Returns the currently configured delta threshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;
Expand Down Expand Up @@ -73,4 +74,6 @@ public interface ShardsAllocator {
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
*/
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);

default void setRerouteService(RerouteService rerouteService) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void cleanCaches() {

// for tests
protected ShardsBatchGatewayAllocator() {
this(DEFAULT_SHARD_BATCH_SIZE);
this(DEFAULT_SHARD_BATCH_SIZE, null);
}

protected ShardsBatchGatewayAllocator(long batchSize) {
this.rerouteService = null;
protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) {
this.rerouteService = rerouteService;
this.batchStartedAction = null;
this.primaryShardBatchAllocator = null;
this.batchStoreAction = null;
Expand Down Expand Up @@ -297,6 +297,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size());
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
if (timedOutPrimaryShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
} else {
Expand All @@ -320,6 +332,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
if (timedOutReplicaShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ protected Node(
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
clusterModule.setRerouteServiceForAllocator(rerouteService);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,19 @@ public void testQueryGroupMetadataRegister() {
);
}

public void testRerouteServiceSetForBalancedShardsAllocator() {
ClusterModule clusterModule = new ClusterModule(
Settings.EMPTY,
clusterService,
Collections.emptyList(),
clusterInfoService,
null,
threadContext,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
);
clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state()));
}

private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) {
return new ClusterPlugin() {
@Override
Expand Down
Loading

0 comments on commit 7fa2edb

Please sign in to comment.