Skip to content

Commit

Permalink
Stabilize testRerouteRecovery throttle testing (#100788) (#100854)
Browse files Browse the repository at this point in the history
Refactor testRerouteRecovery, pulling out testing of shard recovery
throttling into separate targeted tests. Now there are two additional
tests, one testing source node throttling, and another testing target
node throttling. Throttling both nodes at once leads to primarily the
source node registering throttling, while the target node mostly has
no cause to instigate throttling.

(cherry picked from commit 323d936)
  • Loading branch information
DiannaHohensee authored Oct 16, 2023
1 parent c29a5fb commit 919fb79
Showing 1 changed file with 222 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ private void assertOnGoingRecoveryState(
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
}

/**
* Creates node settings that will throttle shard recovery to 'chunkSize' bytes per second.
*
* @param chunkSizeBytes size of the chunk in bytes
* @return A Settings.Builder
*/
public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) {
return Settings.builder()
// Set the chunk size in bytes
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES))
// Set one chunk of bytes per second.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES);
}

private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
updateClusterSettings(
Expand All @@ -231,11 +245,82 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
private void restoreRecoverySpeed() {
updateClusterSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
// 200mb is an arbitrary number intended to be large enough to avoid more throttling.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
);
}

/**
* Initiates a shard recovery and verifies that it's running.
*
* @param sourceNode node holding the shard
* @param targetNode node that will recover the shard
* @throws Exception
*/
public void startShardRecovery(String sourceNode, String targetNode) throws Exception {
logger.info("--> updating cluster settings with moving shard from node `{}` to node `{}`", sourceNode, targetNode);
clusterAdmin().prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, sourceNode, targetNode))
.execute()
.actionGet()
.getState();

logger.info("--> requesting shard recovery");
indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();

logger.info("--> waiting for recovery to begin on both the source and target nodes");
final Index index = resolveIndex(INDEX_NAME);
assertBusy(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, sourceNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, targetNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
});

logger.info("--> checking cluster recovery stats reflect the ongoing recovery on each node");
NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(sourceNode)) {
assertThat(sourceNode + " should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat(sourceNode + " should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
}
if (nodeStats.getNode().getName().equals(targetNode)) {
assertThat(targetNode + " should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat(targetNode + " should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
}
}
}

/**
* Asserts that the cluster stats show no shard recovery is active in the cluster and that 'nodeName' has >=0
* throttling stats if 'isRecoveryThrottlingNode' or ==0 if not.
*
* @param nodeName the name of the node
* @param isRecoveryThrottlingNode whether to expect throttling to have occurred on the node
*/
public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean isRecoveryThrottlingNode) {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodesStatsResponse.getNodes(), hasSize(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
if (isRecoveryThrottlingNode) {
assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L));
} else {
assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}

public void testGatewayRecovery() throws Exception {
logger.info("--> start nodes");
String node = internalCluster().startNode();
Expand Down Expand Up @@ -431,7 +516,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99941")
public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
Expand Down Expand Up @@ -481,50 +565,18 @@ public void testRerouteRecovery() throws Exception {
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
long nodeAThrottling = Long.MAX_VALUE;
long nodeBThrottling = Long.MAX_VALUE;
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
nodeAThrottling = recoveryStats.throttleTime().millis();
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
nodeBThrottling = recoveryStats.throttleTime().millis();
}
}

logger.info("--> checking throttling increases");
final long finalNodeAThrottling = nodeAThrottling;
final long finalNodeBThrottling = nodeBThrottling;
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
);
}
}
});

logger.info("--> speeding up recoveries");
restoreRecoverySpeed();

Expand All @@ -538,6 +590,7 @@ public void testRerouteRecovery() throws Exception {

assertRecoveryState(recoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, Stage.DONE, nodeA, nodeB);
validateIndexRecoveryState(recoveryStates.get(0).getIndex());

Consumer<String> assertNodeHasThrottleTimeAndNoRecoveries = nodeName -> {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
Expand All @@ -549,7 +602,6 @@ public void testRerouteRecovery() throws Exception {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
Expand All @@ -560,9 +612,6 @@ public void testRerouteRecovery() throws Exception {
setReplicaCount(1, INDEX_NAME);
ensureGreen();

assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB));

logger.info("--> start node C");
String nodeC = internalCluster().startNode();
assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut());
Expand Down Expand Up @@ -637,6 +686,141 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

/**
* Tests shard recovery throttling on the source node. Node statistics should show throttling time on the source node, while no
* throttling should be shown on the target node because the source will send data more slowly than the target's throttling threshold.
*/
public void testSourceThrottling() throws Exception {
// --- Cluster setup.

logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

logger.info("--> starting node B with default settings");
final String nodeB = internalCluster().startNode();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> restarting node A with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
internalCluster().restartNode(nodeA, new InternalTestCluster.RestartCallback() {
// This callback returns node Settings that are ultimately passed into the restarted node.
@Override
public Settings onNodeStopped(String nodeName) {
return createRecoverySettingsChunkPerSecond(chunkSize).build();
}
});

logger.info("--> waiting for the cluster to stabilize after restarting the source node (Node A)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(nodeA, nodeB);

logger.info("--> checking throttling increases on Node A (source node), while Node B (target node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse nodeStatsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodeStatsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : nodeStatsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// We must use assertBusy because recovery counters are decremented only when the last reference to
// the RecoveryTarget is decremented, which may happen after the recovery finishes.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, true));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, false));
}

/**
* Tests shard recovery throttling on the target node. Node statistics should show throttling time on the target node, while no
* throttling should be shown on the source node because the target will accept data more slowly than the source's throttling threshold.
*/
public void testTargetThrottling() throws Exception {
logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> starting node B with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the existing shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
final String nodeB = internalCluster().startNode(createRecoverySettingsChunkPerSecond(chunkSize));

logger.info("--> waiting for the cluster to stabilize after restarting the target node (Node B)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(nodeA, nodeB);

logger.info("--> checking throttling increases on Node B (target node), while Node A (source node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, false));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, true));
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
Expand Down

0 comments on commit 919fb79

Please sign in to comment.