Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reroute iteration time-bound for large shard allocations #14848

Merged
merged 35 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6d2a9d3
Simplify batch allocators with timeouts
Bukhtawar Jul 21, 2024
ef8232a
Add settings for allocator timeout and add additional logs
imRishN Jul 22, 2024
bf9cf16
Change to TimeoutAwareRunnable
Bukhtawar Jul 22, 2024
de8f6aa
Introduce BatchRunnableExecutor
Bukhtawar Jul 22, 2024
d3d78cf
Null checks
Bukhtawar Jul 22, 2024
1c5c211
nitpick
Bukhtawar Jul 22, 2024
3d8b9e4
Fix NPE bug
imRishN Jul 22, 2024
cf9c9dd
Remove timeout from decide allocate unassigned
imRishN Jul 22, 2024
25588fa
Add UT for BatchRunnableExecutor and BatchAllocators
imRishN Jul 22, 2024
d5ede50
Add cluster green IT
imRishN Jul 22, 2024
b9a9027
Minor fix and spotless apply
imRishN Jul 22, 2024
0cde01c
Add changelog
imRishN Jul 22, 2024
3b4baae
Remove redundant code
imRishN Jul 23, 2024
109616a
Add min value for timeout
imRishN Jul 23, 2024
5c40c99
Fix spotless check
imRishN Jul 23, 2024
f5501fa
Switch to shuffle
Bukhtawar Jul 23, 2024
341fe5f
Switch to shuffle
Bukhtawar Jul 23, 2024
e760f08
Revert "Add min value for timeout"
imRishN Jul 23, 2024
23a8fba
Trigger Build
imRishN Jul 23, 2024
f789037
Fix BatchRunnableExecutorTest test
imRishN Jul 23, 2024
0d0ac70
Trigger Build
imRishN Jul 23, 2024
65b3159
Change log levels and minor comment fix
imRishN Jul 23, 2024
65ea950
Minor fix
imRishN Jul 23, 2024
7a440af
Fix naming convention
imRishN Jul 23, 2024
aa11ddf
Replace null check with Optional
imRishN Jul 23, 2024
6cfdfae
Remove unnecessary logs
imRishN Jul 23, 2024
e83db71
Fix spotless
imRishN Jul 23, 2024
ad77ffa
Update server/src/main/java/org/opensearch/common/util/BatchRunnableE…
imRishN Jul 23, 2024
cc6b5dd
Add null check test
imRishN Jul 23, 2024
d03f8cf
Resolve PR Comments
imRishN Jul 23, 2024
21c4c1c
Fix spotless
imRishN Jul 23, 2024
25ca951
Fix
imRishN Jul 24, 2024
b1a612e
Add ExistingShardsAllocatorTests
imRishN Jul 24, 2024
07befa2
Add tests for ShardsBatchGatewayAllocator
imRishN Jul 24, 2024
6ddb155
Trigger Build
imRishN Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
- Make reroute iteration time-bound for large shard allocations ([#14848](https://github.com/opensearch-project/OpenSearch/pull/14848))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
ensureGreen("test");
}

public void testBatchModeEnabled() throws Exception {
public void testBatchModeEnabledWithoutTimeout() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
Expand Down Expand Up @@ -810,6 +810,132 @@ public void testBatchModeEnabled() throws Exception {
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createNIndices(50, "test"); // this will create 50p, 50r shards
ensureStableCluster(3);
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
ClusterHealthResponse health = client().admin()
.cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
.actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());

String clusterManagerName = internalCluster().getClusterManagerName();
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
ensureStableCluster(1);

logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
assertEquals(100, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(0, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(0, health.getNumberOfDataNodes());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);

// wait for cluster to turn green
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());
assertEquals(0, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(100, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(2, health.getNumberOfDataNodes());
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

public void testBatchModeDisabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -617,10 +618,10 @@

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, true)).ifPresent(Runnable::run);

Check warning on line 621 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L621

Added line #L621 was not covered by tests
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, false)).ifPresent(Runnable::run);

Check warning on line 624 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L624

Added line #L624 was not covered by tests
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -108,14 +109,16 @@
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default Runnable allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<Runnable> runnables = new ArrayList<>();

Check warning on line 114 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L114

Added line #L114 was not covered by tests
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
runnables.add(() -> allocateUnassigned(shardRouting, allocation, iterator));

Check warning on line 118 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L118

Added line #L118 was not covered by tests
}
}
return () -> runnables.forEach(Runnable::run);

Check warning on line 121 in server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java#L121

Added line #L121 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A {@link Runnable} that iteratively executes a batch of {@link TimeoutAwareRunnable}s. If the elapsed time exceeds the timeout defined by {@link TimeValue} timeout, then all subsequent {@link TimeoutAwareRunnable}s will have their {@link TimeoutAwareRunnable#onTimeout} method invoked and will not be run.
*
* @opensearch.internal
*/
public class BatchRunnableExecutor implements Runnable {

private final Supplier<TimeValue> timeoutSupplier;

private final List<TimeoutAwareRunnable> timeoutAwareRunnables;

private static final Logger logger = LogManager.getLogger(BatchRunnableExecutor.class);

public BatchRunnableExecutor(List<TimeoutAwareRunnable> timeoutAwareRunnables, Supplier<TimeValue> timeoutSupplier) {
this.timeoutSupplier = timeoutSupplier;
this.timeoutAwareRunnables = timeoutAwareRunnables;
}

@Override
public void run() {
logger.debug("Starting execution of runnable of size [{}]", timeoutAwareRunnables.size());
long startTime = System.nanoTime();
if (timeoutAwareRunnables.isEmpty()) {
return;
}
Randomness.shuffle(timeoutAwareRunnables);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
for (TimeoutAwareRunnable runnable : timeoutAwareRunnables) {
if (timeoutSupplier.get().nanos() < 0 || System.nanoTime() - startTime < timeoutSupplier.get().nanos()) {
runnable.run();
} else {
logger.debug("Executing timeout for runnable of size [{}]", timeoutAwareRunnables.size());
runnable.onTimeout();
}
}
logger.debug(
"Time taken to execute timed runnables in this cycle:[{}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

/**
* Runnable that is aware of a timeout
*
* @opensearch.internal
*/
public interface TimeoutAwareRunnable extends Runnable {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

void onTimeout();
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.core.index.shard.ShardId;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -78,6 +82,23 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

protected void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation, boolean primary) {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
Set<ShardId> shardIdsFromBatch = new HashSet<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardId shardId = shardRouting.shardId();
shardIdsFromBatch.add(shardId);
}
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) {
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand Down
Loading
Loading