Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize testRerouteRecovery throttle testing #100788

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -235,6 +234,20 @@ private void assertOnGoingRecoveryState(
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
}

/**
* Creates node settings that will throttle shard recovery to 'chunkSize' bytes per second.
*
* @param chunkSizeBytes size of the chunk in bytes
* @return A Settings.Builder
*/
public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) {
return Settings.builder()
// Set the chunk size in bytes
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES))
// Set one chunk of bytes per second.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES);
}

private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
updateClusterSettings(
Expand All @@ -249,11 +262,83 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
private void restoreRecoverySpeed() {
updateClusterSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
// 200mb is an arbitrary number intended to be large enough to avoid more throttling.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
);
}

/**
* Initiates a shard recovery and verifies that it's running.
*
* @param INDEX_NAME name of the index
* @param sourceNode node holding the shard
* @param targetNode node recovering the shard
* @throws Exception
*/
public void startShardRecovery(String INDEX_NAME, String sourceNode, String targetNode) throws Exception {
DiannaHohensee marked this conversation as resolved.
Show resolved Hide resolved
logger.info("--> updating cluster settings with moving shard from node `{}` to node `{}`", sourceNode, targetNode);
clusterAdmin().prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, sourceNode, targetNode))
.execute()
.actionGet()
.getState();

logger.info("--> requesting shard recovery");
indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();

logger.info("--> waiting for recovery to begin on both the source and target nodes");
final Index index = resolveIndex(INDEX_NAME);
assertBusy(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, sourceNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, targetNode);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
});

logger.info("--> checking cluster recovery stats reflect the ongoing recovery on each node");
NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(sourceNode)) {
assertThat(sourceNode + " should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat(sourceNode + " should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
}
if (nodeStats.getNode().getName().equals(targetNode)) {
assertThat(targetNode + " should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat(targetNode + " should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
}
}
}

/**
* Asserts that the cluster stats show no shard recovery is active in the cluster and that 'nodeName' has >=0
* throttling stats if 'isRecoveryThrottlingNode' or ==0 if not.
*
* @param nodeName the name of the node
* @param isRecoveryThrottlingNode whether to expect throttling to have occurred on the node
*/
public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean isRecoveryThrottlingNode) {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodesStatsResponse.getNodes(), hasSize(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
if (isRecoveryThrottlingNode) {
assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L));
} else {
assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}

public void testGatewayRecovery() throws Exception {
logger.info("--> start nodes");
String node = internalCluster().startNode();
Expand Down Expand Up @@ -445,7 +530,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99941")
public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
Expand All @@ -466,15 +550,6 @@ public void testRerouteRecovery() throws Exception {
logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
clusterAdmin().prepareReroute().add(new MoveAllocationCommand(INDEX_NAME, 0, nodeA, nodeB)).execute().actionGet().getState();

logger.info("--> waiting for recovery to start both on source and target");
final Index index = resolveIndex(INDEX_NAME);
assertBusy(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this wait looks suspicious to me. It's not throttling-related, it's waiting for both nodes to start handling the recovery and that looks important for the next few lines of the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I added this back to the testRerouteRecovery test. Originally I thought you meant in the new tests, because of the diff -- this is in the new tests.

I basically undid what the original patch to add throttling added to this test. I don't have strong opinions, though.

});

logger.info("--> request recoveries");
RecoveryResponse response = indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();

Expand All @@ -490,55 +565,6 @@ public void testRerouteRecovery() throws Exception {
assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, nodeA, nodeB);
validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex());

logger.info("--> request node recovery stats");
NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
long nodeAThrottling = Long.MAX_VALUE;
long nodeBThrottling = Long.MAX_VALUE;
for (NodeStats nodeStats : statsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1));
assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0));
nodeAThrottling = recoveryStats.throttleTime().millis();
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0));
assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the check that A has 1 source and 0 targets and B has 0 sources and 1 target. We can drop the bit about the throttling here tho, and probably the assertBusy().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

nodeBThrottling = recoveryStats.throttleTime().millis();
}
}

logger.info("--> checking throttling increases");
final long finalNodeAThrottling = nodeAThrottling;
final long finalNodeBThrottling = nodeBThrottling;
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
);
}
}
});

logger.info("--> speeding up recoveries");
restoreRecoverySpeed();

Expand All @@ -552,31 +578,11 @@ public void testRerouteRecovery() throws Exception {

assertRecoveryState(recoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, Stage.DONE, nodeA, nodeB);
validateIndexRecoveryState(recoveryStates.get(0).getIndex());
Consumer<String> assertNodeHasThrottleTimeAndNoRecoveries = nodeName -> {
NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats()
.setNodesIds(nodeName)
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodesStatsResponse.getNodes(), hasSize(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise with this bit, I think we should still be waiting for the recoveries to reach zero according to both nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB));

logger.info("--> bump replica count");
setReplicaCount(1, INDEX_NAME);
ensureGreen();

assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB));

logger.info("--> start node C");
String nodeC = internalCluster().startNode();
assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut());
Expand Down Expand Up @@ -651,6 +657,141 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

/**
* Tests shard recovery throttling on the source node. Node statistics should show throttling time on the source node, while no
* throttling should be shown on the target node because the source will send data more slowly than the target's throttling threshold.
*/
public void testSourceThrottling() throws Exception {
// --- Cluster setup.

logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

logger.info("--> starting node B with default settings");
final String nodeB = internalCluster().startNode();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> restarting node A with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
internalCluster().restartNode(nodeA, new InternalTestCluster.RestartCallback() {
// This callback returns node Settings that are ultimately passed into the restarted node.
@Override
public Settings onNodeStopped(String nodeName) {
return createRecoverySettingsChunkPerSecond(chunkSize).build();
}
});

logger.info("--> waiting for the cluster to stabilize after restarting the source node (Node A)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(INDEX_NAME, nodeA, nodeB);

logger.info("--> checking throttling increases on Node A (source node), while Node B (target node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse nodeStatsResponse = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(nodeStatsResponse.getNodes(), hasSize(2));
for (NodeStats nodeStats : nodeStatsResponse.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// We must use assertBusy because recovery counters are decremented only when the last reference to
// the RecoveryTarget is decremented, which may happen after the recovery finishes.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, true));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, false));
}

/**
* Tests shard recovery throttling on the target node. Node statistics should show throttling time on the target node, while no
* throttling should be shown on the source node because the target will accept data more slowly than the source's throttling threshold.
*/
public void testTargetThrottling() throws Exception {
logger.info("--> starting node A with default settings");
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
.getStore()
.size();

long chunkSize = Math.max(1, shardSize.getBytes() / 10);
logger.info(
"--> starting node B with recovery throttling settings. Index shard size is `{}`. Throttling down to a "
+ "chunk of size `{}` per second. This will slow recovery of the existing shard to 10 seconds.",
shardSize,
ByteSizeValue.ofBytes(chunkSize)
);
final String nodeB = internalCluster().startNode(createRecoverySettingsChunkPerSecond(chunkSize));

logger.info("--> waiting for the cluster to stabilize after restarting the target node (Node B)");
ensureGreen();

// --- Shard recovery.

startShardRecovery(INDEX_NAME, nodeA, nodeB);

logger.info("--> checking throttling increases on Node B (target node), while Node A (source node) reports no throttling");
assertBusy(() -> {
NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat("node A throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L));
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L));
}
}
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();

// --- Shard recovery complete. Verify throttling millis remain reflected in node stats.

logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats");
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, false));
assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, true));
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
Expand Down