diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index a4d6d99173db6..f24568742bd66 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -235,6 +235,20 @@ private void assertOnGoingRecoveryState( assertThat(state.getStage(), not(equalTo(Stage.DONE))); } + /** + * Creates node settings that will throttle shard recovery to 'chunkSize' bytes per second. + * + * @param chunkSizeBytes size of the chunk in bytes + * @return A Settings.Builder + */ + public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) { + return Settings.builder() + // Set the chunk size in bytes + .put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES)) + // Set one chunk of bytes per second. + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES); + } + private void slowDownRecovery(ByteSizeValue shardSize) { long chunkSize = Math.max(1, shardSize.getBytes() / 10); updateClusterSettings( @@ -249,11 +263,82 @@ private void slowDownRecovery(ByteSizeValue shardSize) { private void restoreRecoverySpeed() { updateClusterSettings( Settings.builder() - .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb") + // 200mb is an arbitrary number intended to be large enough to avoid more throttling. + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb") .put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE) ); } + /** + * Initiates a shard recovery and verifies that it's running. + * + * @param sourceNode node holding the shard + * @param targetNode node that will recover the shard + * @throws Exception + */ + public void startShardRecovery(String sourceNode, String targetNode) throws Exception { + logger.info("--> updating cluster settings with moving shard from node `{}` to node `{}`", sourceNode, targetNode); + clusterAdmin().prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, sourceNode, targetNode)) + .execute() + .actionGet() + .getState(); + + logger.info("--> requesting shard recovery"); + indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + logger.info("--> waiting for recovery to begin on both the source and target nodes"); + final Index index = resolveIndex(INDEX_NAME); + assertBusy(() -> { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, sourceNode); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1)); + indicesService = internalCluster().getInstance(IndicesService.class, targetNode); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1)); + }); + + logger.info("--> checking cluster recovery stats reflect the ongoing recovery on each node"); + NodesStatsResponse statsResponse = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + for (NodeStats nodeStats : statsResponse.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(sourceNode)) { + assertThat(sourceNode + " should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1)); + assertThat(sourceNode + " should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0)); + } + if (nodeStats.getNode().getName().equals(targetNode)) { + assertThat(targetNode + " should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0)); + assertThat(targetNode + " should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1)); + } + } + } + + /** + * Asserts that the cluster stats show no shard recovery is active in the cluster and that 'nodeName' has >=0 + * throttling stats if 'isRecoveryThrottlingNode' or ==0 if not. + * + * @param nodeName the name of the node + * @param isRecoveryThrottlingNode whether to expect throttling to have occurred on the node + */ + public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean isRecoveryThrottlingNode) { + NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats() + .setNodesIds(nodeName) + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(nodesStatsResponse.getNodes(), hasSize(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + assertThat(recoveryStats.currentAsSource(), equalTo(0)); + assertThat(recoveryStats.currentAsTarget(), equalTo(0)); + if (isRecoveryThrottlingNode) { + assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } else { + assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + } + public void testGatewayRecovery() throws Exception { logger.info("--> start nodes"); String node = internalCluster().startNode(); @@ -449,7 +534,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99941") public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); @@ -499,50 +583,18 @@ public void testRerouteRecovery() throws Exception { .clear() .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) .get(); - long nodeAThrottling = Long.MAX_VALUE; - long nodeBThrottling = Long.MAX_VALUE; for (NodeStats nodeStats : statsResponse.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); if (nodeStats.getNode().getName().equals(nodeA)) { assertThat("node A should have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(1)); assertThat("node A should not have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(0)); - nodeAThrottling = recoveryStats.throttleTime().millis(); } if (nodeStats.getNode().getName().equals(nodeB)) { assertThat("node B should not have ongoing recovery as source", recoveryStats.currentAsSource(), equalTo(0)); assertThat("node B should have ongoing recovery as target", recoveryStats.currentAsTarget(), equalTo(1)); - nodeBThrottling = recoveryStats.throttleTime().millis(); } } - logger.info("--> checking throttling increases"); - final long finalNodeAThrottling = nodeAThrottling; - final long finalNodeBThrottling = nodeBThrottling; - assertBusy(() -> { - NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats() - .clear() - .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) - .get(); - assertThat(statsResponse1.getNodes(), hasSize(2)); - for (NodeStats nodeStats : statsResponse1.getNodes()) { - final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); - if (nodeStats.getNode().getName().equals(nodeA)) { - assertThat( - "node A throttling should increase", - recoveryStats.throttleTime().millis(), - greaterThan(finalNodeAThrottling) - ); - } - if (nodeStats.getNode().getName().equals(nodeB)) { - assertThat( - "node B throttling should increase", - recoveryStats.throttleTime().millis(), - greaterThan(finalNodeBThrottling) - ); - } - } - }); - logger.info("--> speeding up recoveries"); restoreRecoverySpeed(); @@ -556,6 +608,7 @@ public void testRerouteRecovery() throws Exception { assertRecoveryState(recoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, true, Stage.DONE, nodeA, nodeB); validateIndexRecoveryState(recoveryStates.get(0).getIndex()); + Consumer assertNodeHasThrottleTimeAndNoRecoveries = nodeName -> { NodesStatsResponse nodesStatsResponse = clusterAdmin().prepareNodesStats() .setNodesIds(nodeName) @@ -567,7 +620,6 @@ public void testRerouteRecovery() throws Exception { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); assertThat(recoveryStats.currentAsSource(), equalTo(0)); assertThat(recoveryStats.currentAsTarget(), equalTo(0)); - assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L)); }; // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget // is decremented, which may happen after the recovery was done. @@ -578,9 +630,6 @@ public void testRerouteRecovery() throws Exception { setReplicaCount(1, INDEX_NAME); ensureGreen(); - assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeA)); - assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries.accept(nodeB)); - logger.info("--> start node C"); String nodeC = internalCluster().startNode(); assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut()); @@ -655,6 +704,141 @@ public void testRerouteRecovery() throws Exception { validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } + /** + * Tests shard recovery throttling on the source node. Node statistics should show throttling time on the source node, while no + * throttling should be shown on the target node because the source will send data more slowly than the target's throttling threshold. + */ + public void testSourceThrottling() throws Exception { + // --- Cluster setup. + + logger.info("--> starting node A with default settings"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> creating index on node A"); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() + .getStore() + .size(); + + logger.info("--> starting node B with default settings"); + final String nodeB = internalCluster().startNode(); + + long chunkSize = Math.max(1, shardSize.getBytes() / 10); + logger.info( + "--> restarting node A with recovery throttling settings. Index shard size is `{}`. Throttling down to a " + + "chunk of size `{}` per second. This will slow recovery of the shard to 10 seconds.", + shardSize, + ByteSizeValue.ofBytes(chunkSize) + ); + internalCluster().restartNode(nodeA, new InternalTestCluster.RestartCallback() { + // This callback returns node Settings that are ultimately passed into the restarted node. + @Override + public Settings onNodeStopped(String nodeName) { + return createRecoverySettingsChunkPerSecond(chunkSize).build(); + } + }); + + logger.info("--> waiting for the cluster to stabilize after restarting the source node (Node A)"); + ensureGreen(); + + // --- Shard recovery. + + startShardRecovery(nodeA, nodeB); + + logger.info("--> checking throttling increases on Node A (source node), while Node B (target node) reports no throttling"); + assertBusy(() -> { + NodesStatsResponse nodeStatsResponse = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(nodeStatsResponse.getNodes(), hasSize(2)); + for (NodeStats nodeStats : nodeStatsResponse.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + } + }); + + logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly"); + restoreRecoverySpeed(); + + logger.info("--> waiting for the shard recovery to complete"); + ensureGreen(); + + // --- Shard recovery complete. Verify throttling millis remain reflected in node stats. + + logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats"); + // We must use assertBusy because recovery counters are decremented only when the last reference to + // the RecoveryTarget is decremented, which may happen after the recovery finishes. + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, true)); + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, false)); + } + + /** + * Tests shard recovery throttling on the target node. Node statistics should show throttling time on the target node, while no + * throttling should be shown on the source node because the target will accept data more slowly than the source's throttling threshold. + */ + public void testTargetThrottling() throws Exception { + logger.info("--> starting node A with default settings"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> creating index on node A"); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() + .getStore() + .size(); + + long chunkSize = Math.max(1, shardSize.getBytes() / 10); + logger.info( + "--> starting node B with recovery throttling settings. Index shard size is `{}`. Throttling down to a " + + "chunk of size `{}` per second. This will slow recovery of the existing shard to 10 seconds.", + shardSize, + ByteSizeValue.ofBytes(chunkSize) + ); + final String nodeB = internalCluster().startNode(createRecoverySettingsChunkPerSecond(chunkSize)); + + logger.info("--> waiting for the cluster to stabilize after restarting the target node (Node B)"); + ensureGreen(); + + // --- Shard recovery. + + startShardRecovery(nodeA, nodeB); + + logger.info("--> checking throttling increases on Node B (target node), while Node A (source node) reports no throttling"); + assertBusy(() -> { + NodesStatsResponse statsResponse1 = clusterAdmin().prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) + .get(); + assertThat(statsResponse1.getNodes(), hasSize(2)); + for (NodeStats nodeStats : statsResponse1.getNodes()) { + final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); + if (nodeStats.getNode().getName().equals(nodeA)) { + assertThat("node A throttling should _not_ increase", recoveryStats.throttleTime().millis(), equalTo(0L)); + } + if (nodeStats.getNode().getName().equals(nodeB)) { + assertThat("node B throttling should increase", recoveryStats.throttleTime().millis(), greaterThan(0L)); + } + } + }); + + logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly"); + restoreRecoverySpeed(); + + logger.info("--> waiting for the shard recovery to complete"); + ensureGreen(); + + // --- Shard recovery complete. Verify throttling millis remain reflected in node stats. + + logger.info("--> checking that both nodes A and B no longer have recoveries in progress, but that they do retain throttling stats"); + // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget + // is decremented, which may happen after the recovery was done. + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeA, false)); + assertBusy(() -> assertNodeHasThrottleTimeAndNoRecoveries(nodeB, true)); + } + public void testSnapshotRecovery() throws Exception { logger.info("--> start node A"); String nodeA = internalCluster().startNode();