Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize testRerouteRecovery throttle testing #100788

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,20 @@ private void assertOnGoingRecoveryState(
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
}

/**
* Creates node settings that will throttle shard recovery to 'chunkSize' bytes per second.
*
* @param chunkSizeBytes size of the chunk in bytes
* @return A Settings.Builder
*/
public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) {
return Settings.builder()
// Set the chunk size in bytes
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES))
// Set one chunk of bytes per second.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES);
}

private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
updateClusterSettings(
Expand All @@ -249,11 +263,82 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
private void restoreRecoverySpeed() {
updateClusterSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
// 200mb is an arbitrary number intended to be large enough to avoid more throttling.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
);
}

/**
* Initiates a shard recovery and verifies that it's running.
*
* @param sourceNode node holding the shard
* @param targetNode node that will recover the shard
* @throws Exception
*/
public void startShardRecovery(String sourceNode, String targetNode) throws Exception {
logger.info("--> updating cluster settings with moving shard from node `{}` to node `{}`", sourceNode, targetNode);
clusterAdmin().prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, sourceNode, targetNode))
.execute()
.actionGet()
.getState();

logger.info("--> requesting shard recovery");
indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();

logger.info("--> waiting for recovery to begin on both the source and target nodes");
final Index index = resolveIndex(INDEX_NAME);
assertBusy(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, sourceNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, targetNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
});

logger.info("--> checking cluster recovery stats reflect the ongoing recovery on each node");
NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(sourceNode)) {
assertThat(sourceNode + " should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat(sourceNode + " should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
}
if (nodeStats.getNode().getName().equals(targetNode)) {
assertThat(targetNode + " should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat(targetNode + " should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
}
}
}

/**
* Asserts that the cluster stats show no shard recovery is active in the cluster and that 'nodeName' has >=0
* throttling stats if 'isRecoveryThrottlingNode' or ==0 if not.
*
* @param nodeName the name of the node
* @param isRecoveryThrottlingNode whether to expect throttling to have occurred on the node
*/
public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean isRecoveryThrottlingNode) {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodesStatsResponse.getNodes(), hasSize(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
if (isRecoveryThrottlingNode) {
assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L));
} else {
assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}

public void testGatewayRecovery() throws Exception {
logger.info("--> start nodes");
String node = internalCluster().startNode();
Expand Down Expand Up @@ -445,7 +530,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99941")
public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
Expand Down Expand Up @@ -495,50 +579,18 @@ public void testRerouteRecovery() throws Exception {
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
long nodeAThrottling = Long.MAX_VALUE;
long nodeBThrottling = Long.MAX_VALUE;
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
nodeAThrottling = recoveryStats.throttleTime().millis();
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
nodeBThrottling = recoveryStats.throttleTime().millis();
}
}

logger.info("--> checking throttling increases");
final long finalNodeAThrottling = nodeAThrottling;
final long finalNodeBThrottling = nodeBThrottling;
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
);
}
}
});

logger.info("--> speeding up recoveries");
restoreRecoverySpeed();

Expand All @@ -552,6 +604,7 @@ public void testRerouteRecovery() throws Exception {

assertRecoveryState(recoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, Stage.DONE, nodeA, nodeB);
validateIndexRecoveryState(recoveryStates.get(0).getIndex());

Consumer<String> assertNodeHasThrottleTimeAndNoRecoveries = nodeName -> {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
Expand All @@ -563,7 +616,6 @@ public void testRerouteRecovery() throws Exception {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
Expand All @@ -574,9 +626,6 @@ public void testRerouteRecovery() throws Exception {
setReplicaCount(1, INDEX_NAME);
ensureGreen();

assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB));

logger.info("--> start node C");
String nodeC = internalCluster().startNode();
assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut());
Expand Down Expand Up @@ -651,6 +700,141 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

/**
* Tests shard recovery throttling on the source node. Node statistics should show throttling time on the source node, while no
* throttling should be shown on the target node because the source will send data more slowly than the target's throttling threshold.
*/
public void testSourceThrottling() throws Exception {
// --- Cluster setup.

logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

logger.info("--> starting node B with default settings");
final String nodeB = internalCluster().startNode();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> restarting node A with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
internalCluster().restartNode(nodeA, new InternalTestCluster.RestartCallback() {
// This callback returns node Settings that are ultimately passed into the restarted node.
@Override
public Settings onNodeStopped(String nodeName) {
return createRecoverySettingsChunkPerSecond(chunkSize).build();
}
});

logger.info("--> waiting for the cluster to stabilize after restarting the source node (Node A)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(nodeA, nodeB);

logger.info("--> checking throttling increases on Node A (source node), while Node B (target node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse nodeStatsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodeStatsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : nodeStatsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// We must use assertBusy because recovery counters are decremented only when the last reference to
// the RecoveryTarget is decremented, which may happen after the recovery finishes.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, true));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, false));
}

/**
* Tests shard recovery throttling on the target node. Node statistics should show throttling time on the target node, while no
* throttling should be shown on the source node because the target will accept data more slowly than the source's throttling threshold.
*/
public void testTargetThrottling() throws Exception {
logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> starting node B with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the existing shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
final String nodeB = internalCluster().startNode(createRecoverySettingsChunkPerSecond(chunkSize));

logger.info("--> waiting for the cluster to stabilize after restarting the target node (Node B)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(nodeA, nodeB);

logger.info("--> checking throttling increases on Node B (target node), while Node A (source node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, false));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, true));
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
Expand Down