From 323d9366dfa288fea2f78fc28128b456c3d3246e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 13 Oct 2023 15:45:26 -0400 Subject: [PATCH] Stabilize testRerouteRecovery throttle testing (#100788) Refactor testRerouteRecovery, pulling out testing of shard recovery throttling into separate targeted tests. Now there are two additional tests, one testing source node throttling, and another testing target node throttling. Throttling both nodes at once leads to primarily the source node registering throttling, while the target node mostly has no cause to instigate throttling. --- .../indices/recovery/IndexRecoveryIT.java | 260 +++++++++++++++--- 1 file changed, 222 insertions(+), 38 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index f556486795c2a..f230004964d9d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -235,6 +235,20 @@ private void assertOnGoingRecoveryState( assertThat(state.getStage(), not(equalTo(Stage.DONE))); } + /** + * Creates node settings that will throttle shard recovery to 'chunkSize' bytes per second. + * + * @param chunkSizeBytes size of the chunk in bytes + * @return A Settings.Builder + */ + public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) { + return Settings.builder() + // Set the chunk size in bytes + .put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES)) + // Set one chunk of bytes per second. + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES); + } + private void slowDownRecovery(ByteSizeValue shardSize) { long chunkSize = Math.max(1, shardSize.getBytes() / 10); updateClusterSettings( @@ -249,11 +263,82 @@ private void slowDownRecovery(ByteSizeValue shardSize) { private void restoreRecoverySpeed() { updateClusterSettings( Settings.builder() - .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb") + // 200mb is an arbitrary number intended to be large enough to avoid more throttling. + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb") .put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE) ); } + /** + * Initiates a shard recovery and verifies that it's running. + * + * @param sourceNode node holding the shard + * @param targetNode node that will recover the shard + * @throws Exception + */ + public void startShardRecovery(String sourceNode, String targetNode) throws Exception { + logger.info("--> updating cluster settings with moving shard from node `{}` to node `{}`", sourceNode, targetNode); + clusterAdmin().prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, sourceNode, targetNode)) + .execute() + .actionGet() + .getState(); + + logger.info("--> requesting shard recovery"); + indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + logger.info("--> waiting for recovery to begin on both the source and target nodes"); + final Index index = resolveIndex(INDEX_NAME); + assertBusy(() -> { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, sourceNode); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1)); + indicesService = internalCluster().getInstance(IndicesService.class, targetNode); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1)); + }); + + logger.info("--> checking cluster recovery stats reflect the ongoing recovery on each node"); + NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + for (NodeStats nodeStats : statsResponse.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(sourceNode)) { + assertThat(sourceNode + " should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1)); + assertThat(sourceNode + " should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0)); + } + if (nodeStats.getNode().getName().equals(targetNode)) { + assertThat(targetNode + " should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0)); + assertThat(targetNode + " should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1)); + } + } + } + + /** + * Asserts that the cluster stats show no shard recovery is active in the cluster and that 'nodeName' has >=0 + * throttling stats if 'isRecoveryThrottlingNode' or ==0 if not. + * + * @param nodeName the name of the node + * @param isRecoveryThrottlingNode whether to expect throttling to have occurred on the node + */ + public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean isRecoveryThrottlingNode) { + NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats() + .setNodesIds(nodeName) + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(nodesStatsResponse.getNodes(), hasSize(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + assertThat(recoveryStats.currentAsSource(), equalTo(0)); + assertThat(recoveryStats.currentAsTarget(), equalTo(0)); + if (isRecoveryThrottlingNode) { + assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } else { + assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + } + public void testGatewayRecovery() throws Exception { logger.info("--> start nodes"); String node = internalCluster().startNode(); @@ -445,7 +530,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99941") public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); @@ -495,50 +579,18 @@ public void testRerouteRecovery() throws Exception { .clear() .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) .get(); - long nodeAThrottling = Long.MAX_VALUE; - long nodeBThrottling = Long.MAX_VALUE; for (NodeStats nodeStats : statsResponse.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); if (nodeStats.getNode().getName().equals(nodeA)) { assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1)); assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0)); - nodeAThrottling = recoveryStats.throttleTime().millis(); } if (nodeStats.getNode().getName().equals(nodeB)) { assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0)); assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1)); - nodeBThrottling = recoveryStats.throttleTime().millis(); } } - logger.info("--> checking throttling increases"); - final long finalNodeAThrottling = nodeAThrottling; - final long finalNodeBThrottling = nodeBThrottling; - assertBusy(() -> { - NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats() - .clear() - .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) - .get(); - assertThat(statsResponse1.getNodes(), hasSize(2)); - for (NodeStats nodeStats : statsResponse1.getNodes()) { - final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); - if (nodeStats.getNode().getName().equals(nodeA)) { - assertThat( - "node A throttling should increase", - recoveryStats.throttleTime().millis(), - greaterThan(finalNodeAThrottling) - ); - } - if (nodeStats.getNode().getName().equals(nodeB)) { - assertThat( - "node B throttling should increase", - recoveryStats.throttleTime().millis(), - greaterThan(finalNodeBThrottling) - ); - } - } - }); - logger.info("--> speeding up recoveries"); restoreRecoverySpeed(); @@ -552,6 +604,7 @@ public void testRerouteRecovery() throws Exception { assertRecoveryState(recoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, Stage.DONE, nodeA, nodeB); validateIndexRecoveryState(recoveryStates.get(0).getIndex()); + Consumer assertNodeHasThrottleTimeAndNoRecoveries = nodeName -> { NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats() .setNodesIds(nodeName) @@ -563,7 +616,6 @@ public void testRerouteRecovery() throws Exception { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); assertThat(recoveryStats.currentAsSource(), equalTo(0)); assertThat(recoveryStats.currentAsTarget(), equalTo(0)); - assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L)); }; // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget // is decremented, which may happen after the recovery was done. @@ -574,9 +626,6 @@ public void testRerouteRecovery() throws Exception { setReplicaCount(1, INDEX_NAME); ensureGreen(); - assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA)); - assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB)); - logger.info("--> start node C"); String nodeC = internalCluster().startNode(); assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut()); @@ -651,6 +700,141 @@ public void testRerouteRecovery() throws Exception { validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } + /** + * Tests shard recovery throttling on the source node. Node statistics should show throttling time on the source node, while no + * throttling should be shown on the target node because the source will send data more slowly than the target's throttling threshold. + */ + public void testSourceThrottling() throws Exception { + // --- Cluster setup. + + logger.info("--> starting node A with default settings"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> creating index on node A"); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() + .getStore() + .size(); + + logger.info("--> starting node B with default settings"); + final String nodeB = internalCluster().startNode(); + + long chunkSize = Math.max(1, shardSize.getBytes() / 10); + logger.info( + "--> restarting node A with recovery throttling settings. Index shard size is `{}`. Throttling down to a " + + "chunk of size `{}` per second. This will slow recovery of the shard to 10 seconds.", + shardSize, + ByteSizeValue.ofBytes(chunkSize) + ); + internalCluster().restartNode(nodeA, new InternalTestCluster.RestartCallback() { + // This callback returns node Settings that are ultimately passed into the restarted node. + @Override + public Settings onNodeStopped(String nodeName) { + return createRecoverySettingsChunkPerSecond(chunkSize).build(); + } + }); + + logger.info("--> waiting for the cluster to stabilize after restarting the source node (Node A)"); + ensureGreen(); + + // --- Shard recovery. + + startShardRecovery(nodeA, nodeB); + + logger.info("--> checking throttling increases on Node A (source node), while Node B (target node) reports no throttling"); + assertBusy(() -> { + NodesStatsResponse nodeStatsResponse = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(nodeStatsResponse.getNodes(), hasSize(2)); + for (NodeStats nodeStats : nodeStatsResponse.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + } + }); + + logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly"); + restoreRecoverySpeed(); + + logger.info("--> waiting for the shard recovery to complete"); + ensureGreen(); + + // --- Shard recovery complete. Verify throttling millis remain reflected in node stats. + + logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats"); + // We must use assertBusy because recovery counters are decremented only when the last reference to + // the RecoveryTarget is decremented, which may happen after the recovery finishes. + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, true)); + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, false)); + } + + /** + * Tests shard recovery throttling on the target node. Node statistics should show throttling time on the target node, while no + * throttling should be shown on the source node because the target will accept data more slowly than the source's throttling threshold. + */ + public void testTargetThrottling() throws Exception { + logger.info("--> starting node A with default settings"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> creating index on node A"); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() + .getStore() + .size(); + + long chunkSize = Math.max(1, shardSize.getBytes() / 10); + logger.info( + "--> starting node B with recovery throttling settings. Index shard size is `{}`. Throttling down to a " + + "chunk of size `{}` per second. This will slow recovery of the existing shard to 10 seconds.", + shardSize, + ByteSizeValue.ofBytes(chunkSize) + ); + final String nodeB = internalCluster().startNode(createRecoverySettingsChunkPerSecond(chunkSize)); + + logger.info("--> waiting for the cluster to stabilize after restarting the target node (Node B)"); + ensureGreen(); + + // --- Shard recovery. + + startShardRecovery(nodeA, nodeB); + + logger.info("--> checking throttling increases on Node B (target node), while Node A (source node) reports no throttling"); + assertBusy(() -> { + NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(statsResponse1.getNodes(), hasSize(2)); + for (NodeStats nodeStats : statsResponse1.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } + } + }); + + logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly"); + restoreRecoverySpeed(); + + logger.info("--> waiting for the shard recovery to complete"); + ensureGreen(); + + // --- Shard recovery complete. Verify throttling millis remain reflected in node stats. + + logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats"); + // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget + // is decremented, which may happen after the recovery was done. + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, false)); + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, true)); + } + public void testSnapshotRecovery() throws Exception { logger.info("--> start node A"); String nodeA = internalCluster().startNode();