From 3bd805537063c5c5ea4672a33d7f576403872a76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 19 Oct 2021 09:08:54 +0200 Subject: [PATCH] Limit concurrent snapshot file restores in recovery per node (#79379) Today we limit the max number of concurrent snapshot file restores per recovery. This works well when the default node_concurrent_recoveries is used (which is 2). When this limit is increased, it is possible to exhaust the underlying repository connection pool, affecting other workloads. This commit adds a new setting `indices.recovery.max_concurrent_snapshot_file_downloads_per_node` that allows to limit the max number of snapshot file downloads per node during recoveries. When a recovery starts in the target node it tries to acquire a permit that allows it to download snapshot files when it is granted. This is communicated to the source node in the StartRecoveryRequest. This is a rather conservative approach since it is possible that a recovery that gets a permit to use snapshot files doesn't recover any snapshot file while there's a concurrent recovery that doesn't get a permit could take advantage of recovering from a snapshot. Closes #79044 Backport of #79316 --- .../modules/indices/recovery.asciidoc | 7 + .../SnapshotBasedIndexRecoveryIT.java | 272 +++++++++++++++++- .../action/index/MappingUpdatedAction.java | 29 +- .../common/settings/ClusterSettings.java | 1 + .../util/concurrent/AdjustableSemaphore.java | 37 +++ .../recovery/PeerRecoveryTargetService.java | 15 +- .../recovery/RecoveriesCollection.java | 8 +- .../indices/recovery/RecoverySettings.java | 84 +++++- .../recovery/RecoverySourceHandler.java | 3 +- .../indices/recovery/RecoveryTarget.java | 22 +- .../recovery/StartRecoveryRequest.java | 35 ++- .../index/MappingUpdatedActionTests.java | 2 +- .../IndexLevelReplicationTests.java | 4 +- .../RecoveryDuringReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 6 +- .../PeerRecoverySourceServiceTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 41 ++- .../recovery/RecoverySettingsTests.java | 87 ++++++ .../recovery/RecoverySourceHandlerTests.java | 4 +- .../indices/recovery/RecoveryTests.java | 4 +- .../recovery/StartRecoveryRequestTests.java | 3 +- .../recovery/RecoveriesCollectionTests.java | 9 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 2 +- .../ShardFollowTaskReplicationTests.java | 2 +- 25 files changed, 613 insertions(+), 74 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java create mode 100644 server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index 4bc389db1a025..9237fdb6c379b 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -102,3 +102,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`. + Do not increase this setting without carefully verifying that your cluster has the resources available to handle the extra load that will result. + +`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`:: +(<>, Expert) Number of snapshot file downloads requests +execyted in parallel in the target node for all recoveries. Defaults to `25`. ++ +Do not increase this setting without carefully verifying that your cluster has +the resources available to handle the extra load that will result. diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java index 6e2b39e228c1e..0fbb46d0a4078 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java @@ -31,11 +31,11 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.shard.IndexShard; @@ -56,11 +56,15 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.NamedXContentRegistry; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -71,10 +75,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -827,6 +833,266 @@ public void testSeqNoBasedRecoveryIsUsedAfterPrimaryFailOver() throws Exception } } + public void testRecoveryUsingSnapshotsIsThrottledPerNode() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + // Ensure that peer recoveries can make progress without restoring snapshot files + // while the permit is granted to a different recovery + String indexRecoveredFromPeer = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromPeer); + assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode); + + // let snapshot file restore to proceed + respondToRecoverSnapshotFile.run(); + + ensureGreen(indexRecoveredFromSnapshot1); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode); + + for (RecoverySnapshotFileRequest recoverySnapshotFileRequest : recoverySnapshotFileRequests) { + String indexName = recoverySnapshotFileRequest.getShardId().getIndexName(); + assertThat(indexName, is(equalTo(indexRecoveredFromSnapshot1))); + } + + targetMockTransportService.clearAllRules(); + + String indexRecoveredFromSnapshot2 = indices.get(2); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + + }); + } + + public void testRecoveryUsingSnapshotsPermitIsReturnedAfterFailureOrCancellation() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + targetMockTransportService.clearAllRules(); + + boolean cancelRecovery = randomBoolean(); + if (cancelRecovery) { + assertAcked(client().admin().indices().prepareDelete(indexRecoveredFromSnapshot1).get()); + + respondToRecoverSnapshotFile.run(); + + assertThat(indexExists(indexRecoveredFromSnapshot1), is(equalTo(false))); + } else { + // Recovery would fail and should release the granted permit and allow other + // recoveries to use snapshots + CountDownLatch cleanFilesRequestReceived = new CountDownLatch(1); + AtomicReference channelRef = new AtomicReference<>(); + targetMockTransportService.addRequestHandlingBehavior(PeerRecoveryTargetService.Actions.CLEAN_FILES, + (handler, request, channel, task) -> { + channelRef.compareAndSet(null, channel); + cleanFilesRequestReceived.countDown(); + } + ); + + respondToRecoverSnapshotFile.run(); + cleanFilesRequestReceived.await(); + + targetMockTransportService.clearAllRules(); + channelRef.get().sendResponse(new IOException("unable to clean files")); + } + + String indexRecoveredFromSnapshot2 = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + }); + } + + public void testRecoveryReEstablishKeepsTheGrantedSnapshotFileDownloadPermit() throws Exception { + executeRecoveryWithSnapshotFileDownloadThrottled((indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + awaitForRecoverSnapshotFileRequestReceived, + respondToRecoverSnapshotFile) -> { + AtomicReference startRecoveryConnection = new AtomicReference<>(); + CountDownLatch reestablishRecoverySent = new CountDownLatch(1); + targetMockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { + startRecoveryConnection.compareAndSet(null, connection); + } else if (action.equals(PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY)) { + reestablishRecoverySent.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + String indexRecoveredFromSnapshot1 = indices.get(0); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot1) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + awaitForRecoverSnapshotFileRequestReceived.run(); + + startRecoveryConnection.get().close(); + + reestablishRecoverySent.await(); + + String indexRecoveredFromPeer = indices.get(1); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromPeer) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromPeer); + assertPeerRecoveryDidNotUseSnapshots(indexRecoveredFromPeer, sourceNode, targetNode); + + respondToRecoverSnapshotFile.run(); + + ensureGreen(indexRecoveredFromSnapshot1); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot1, sourceNode, targetNode); + + targetMockTransportService.clearAllRules(); + + final String indexRecoveredFromSnapshot2 = indices.get(2); + assertAcked( + client().admin().indices().prepareUpdateSettings(indexRecoveredFromSnapshot2) + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", targetNode)).get() + ); + + ensureGreen(indexRecoveredFromSnapshot2); + assertPeerRecoveryUsedSnapshots(indexRecoveredFromSnapshot2, sourceNode, targetNode); + }); + } + + private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecoveryThrottlingTestCase testCase) throws Exception { + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1"); + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1"); + + try { + List dataNodes = internalCluster().startDataOnlyNodes(2); + List indices = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put("index.routing.allocation.require._name", dataNodes.get(0)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + indices.add(indexName); + } + + String repoName = "repo"; + createRepo(repoName, "fs"); + + for (String indexName : indices) { + int numDocs = randomIntBetween(300, 1000); + indexDocs(indexName, numDocs, numDocs); + + createSnapshot(repoName, "snap-" + indexName, Collections.singletonList(indexName)); + } + + String sourceNode = dataNodes.get(0); + String targetNode = dataNodes.get(1); + MockTransportService targetMockTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode); + + List recoverySnapshotFileRequests = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch recoverSnapshotFileRequestReceived = new CountDownLatch(1); + CountDownLatch respondToRecoverSnapshotFile = new CountDownLatch(1); + targetMockTransportService.addRequestHandlingBehavior(PeerRecoveryTargetService.Actions.RESTORE_FILE_FROM_SNAPSHOT, + (handler, request, channel, task) -> { + recoverySnapshotFileRequests.add((RecoverySnapshotFileRequest) request); + recoverSnapshotFileRequestReceived.countDown(); + respondToRecoverSnapshotFile.await(); + handler.messageReceived(request, channel, task); + } + ); + + testCase.execute(indices, + sourceNode, + targetNode, + targetMockTransportService, + recoverySnapshotFileRequests, + recoverSnapshotFileRequestReceived::await, + respondToRecoverSnapshotFile::countDown + ); + } finally { + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), null); + updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), null); + } + } + + interface SnapshotBasedRecoveryThrottlingTestCase { + void execute(List indices, + String sourceNode, + String targetNode, + MockTransportService targetMockTransportService, + List recoverySnapshotFileRequests, + CheckedRunnable awaitForRecoverSnapshotFileRequestReceived, + Runnable respondToRecoverSnapshotFile) throws Exception; + } + + private void assertPeerRecoveryUsedSnapshots(String indexName, String sourceNode, String targetNode) { + RecoveryState recoveryStateIndexRecoveredFromPeer = getLatestPeerRecoveryStateForShard(indexName, 0); + assertPeerRecoveryWasSuccessful(recoveryStateIndexRecoveredFromPeer, sourceNode, targetNode); + assertThat(recoveryStateIndexRecoveredFromPeer.getIndex().recoveredFromSnapshotBytes(), is(greaterThan(0L))); + } + + private void assertPeerRecoveryDidNotUseSnapshots(String indexName, String sourceNode, String targetNode) { + RecoveryState recoveryStateIndexRecoveredFromPeer = getLatestPeerRecoveryStateForShard(indexName, 0); + assertPeerRecoveryWasSuccessful(recoveryStateIndexRecoveredFromPeer, sourceNode, targetNode); + assertThat(recoveryStateIndexRecoveredFromPeer.getIndex().recoveredFromSnapshotBytes(), is(equalTo(0L))); + } + private Store.MetadataSnapshot getMetadataSnapshot(String nodeName, String indexName) throws IOException { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); @@ -926,7 +1192,7 @@ private void assertSearchResponseContainsAllIndexedDocs(SearchResponse searchRes } } - private void assertPeerRecoveryWasSuccessful(RecoveryState recoveryState, String sourceNode, String targetNode) throws Exception { + private void assertPeerRecoveryWasSuccessful(RecoveryState recoveryState, String sourceNode, String targetNode) { assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE)); assertThat(recoveryState.getRecoverySource(), equalTo(RecoverySource.PeerRecoverySource.INSTANCE)); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index bc81b3426cbb2..1fee76c3586c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -30,8 +31,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; -import java.util.concurrent.Semaphore; - /** * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated * in the cluster state meta data (and broadcast to all members). @@ -136,30 +135,4 @@ private static RuntimeException unwrapEsException(ElasticsearchException esEx) { } return new UncategorizedExecutionException("Failed execution", root); } - - static class AdjustableSemaphore extends Semaphore { - - private final Object maxPermitsMutex = new Object(); - private int maxPermits; - - AdjustableSemaphore(int maxPermits, boolean fair) { - super(maxPermits, fair); - this.maxPermits = maxPermits; - } - - void setMaxPermits(int permits) { - synchronized (maxPermitsMutex) { - final int diff = Math.subtractExact(permits, maxPermits); - if (diff > 0) { - // add permits - release(diff); - } else if (diff < 0) { - // remove permits - reducePermits(Math.negateExact(diff)); - } - - maxPermits = permits; - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 8b271ef85aceb..19543901df738 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -233,6 +233,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, + RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java new file mode 100644 index 0000000000000..b89378cd0a6d9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AdjustableSemaphore.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.concurrent.Semaphore; + +public class AdjustableSemaphore extends Semaphore { + + private final Object maxPermitsMutex = new Object(); + private int maxPermits; + + public AdjustableSemaphore(int maxPermits, boolean fair) { + super(maxPermits, fair); + this.maxPermits = maxPermits; + } + + public void setMaxPermits(int permits) { + synchronized (maxPermitsMutex) { + final int diff = Math.subtractExact(permits, maxPermits); + if (diff > 0) { + // add permits + release(diff); + } else if (diff < 0) { + // remove permits + reducePermits(Math.negateExact(diff)); + } + + maxPermits = permits; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8e0c7dd6d9717..c1f9074278f20 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -139,9 +139,16 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { + final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits(); // create a new recovery status, and process... - final long recoveryId = - onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout()); + final long recoveryId = onGoingRecoveries.startRecovery( + indexShard, + sourceNode, + snapshotFilesProvider, + listener, + recoverySettings.activityTimeout(), + snapshotFileDownloadsPermit + ); // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool. threadPool.generic().execute(new RecoveryRunner(recoveryId)); @@ -268,7 +275,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), - startingSeqNo); + startingSeqNo, + recoveryTarget.hasPermitToDownloadSnapshotFiles() + ); return request; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 0646949c08c0f..7b2eb74bbacbc 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -14,6 +14,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, SnapshotFilesProvider snapshotFilesProvider, PeerRecoveryTargetService.RecoveryListener listener, - TimeValue activityTimeout) { - RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener); + TimeValue activityTimeout, + @Nullable Releasable snapshotFileDownloadsPermit) { + RecoveryTarget recoveryTarget = + new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener); startRecoveryInternal(recoveryTarget, activityTimeout); return recoveryTarget.recoveryId(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index f88a49c58194f..445527d87c0ea 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -20,17 +20,29 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.jdk.JavaVersion; import org.elasticsearch.monitor.os.OsProbe; import org.elasticsearch.node.NodeRoleSettings; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; +import static org.elasticsearch.common.settings.Setting.parseInt; + public class RecoverySettings { public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0; public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0; + public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_7_16_0; private static final Logger logger = LogManager.getLogger(RecoverySettings.class); @@ -134,7 +146,7 @@ public class RecoverySettings { /** * recoveries would try to use files from available snapshots instead of sending them from the source node. - * defaults to `false` + * defaults to `true` */ public static final Setting INDICES_RECOVERY_USE_SNAPSHOTS_SETTING = Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope); @@ -148,6 +160,43 @@ public class RecoverySettings { Property.NodeScope ); + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE = new Setting<>( + "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", + "25", + (s) -> parseInt(s, 1, 25, "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", false), + new Setting.Validator() { + private final Collection> dependencies = + Collections.singletonList(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS); + @Override + public void validate(Integer value) { + // ignore + } + + @Override + public void validate(Integer maxConcurrentSnapshotFileDownloadsPerNode, Map, Object> settings) { + int maxConcurrentSnapshotFileDownloads = (int) settings.get(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS); + if (maxConcurrentSnapshotFileDownloadsPerNode < maxConcurrentSnapshotFileDownloads) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, + "[%s]=%d is less than [%s]=%d", + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), + maxConcurrentSnapshotFileDownloadsPerNode, + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), + maxConcurrentSnapshotFileDownloads + ) + ); + } + } + + @Override + public Iterator> settings() { + return dependencies.iterator(); + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue maxBytesPerSec; @@ -162,6 +211,9 @@ public class RecoverySettings { private volatile TimeValue internalActionLongTimeout; private volatile boolean useSnapshotsDuringRecovery; private volatile int maxConcurrentSnapshotFileDownloads; + private volatile int maxConcurrentSnapshotFileDownloadsPerNode; + + private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @@ -186,6 +238,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings); this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings); + this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings); + this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true); logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); @@ -202,6 +256,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, this::setMaxConcurrentSnapshotFileDownloads); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, + this::setMaxConcurrentSnapshotFileDownloadsPerNode); } public RateLimiter rateLimiter() { @@ -303,4 +359,30 @@ public int getMaxConcurrentSnapshotFileDownloads() { public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) { this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads; } + + private void setMaxConcurrentSnapshotFileDownloadsPerNode(int maxConcurrentSnapshotFileDownloadsPerNode) { + this.maxConcurrentSnapshotFileDownloadsPerNode = maxConcurrentSnapshotFileDownloadsPerNode; + this.maxSnapshotFileDownloadsPerNodeSemaphore.setMaxPermits(maxConcurrentSnapshotFileDownloadsPerNode); + } + + @Nullable + Releasable tryAcquireSnapshotDownloadPermits() { + final int maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads(); + final boolean permitAcquired = maxSnapshotFileDownloadsPerNodeSemaphore.tryAcquire(maxConcurrentSnapshotFileDownloads); + if (getUseSnapshotsDuringRecovery() == false || permitAcquired == false) { + if (permitAcquired == false) { + logger.warn(String.format(Locale.ROOT, + "Unable to acquire permit to use snapshot files during recovery, " + + "this recovery will recover index files from the source node. " + + "Ensure snapshot files can be used during recovery by setting [%s] to be no greater than [%d]", + INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), + this.maxConcurrentSnapshotFileDownloadsPerNode + ) + ); + } + return null; + } + + return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads)); + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34dc25fdbc27e..4c67699423965 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -493,6 +493,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); + final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); recoveryPlannerService.computeRecoveryPlan(shard.shardId(), shardStateIdentifier, recoverySourceMetadata, @@ -500,7 +501,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A startingSeqNo, translogOps.getAsInt(), getRequest().targetNode().getVersion(), - useSnapshots, + canUseSnapshots, ActionListener.wrap(plan -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure) ); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 97791c33e2794..41e62637ac862 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -68,6 +68,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final IndexShard indexShard; private final DiscoveryNode sourceNode; private final SnapshotFilesProvider snapshotFilesProvider; + @Nullable // if we're not downloading files from snapshots in this recovery + private final Releasable snapshotFileDownloadsPermit; private final MultiFileWriter multiFileWriter; private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); private final Store store; @@ -90,11 +92,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget * * @param indexShard local shard where we want to recover to * @param sourceNode source node of the recovery where we recover from + * @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot, + * limiting the concurrent snapshot file downloads per node + * preventing the exhaustion of repository resources. * @param listener called when recovery is completed/failed */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, SnapshotFilesProvider snapshotFilesProvider, + @Nullable Releasable snapshotFileDownloadsPermit, PeerRecoveryTargetService.RecoveryListener listener) { this.cancellableThreads = new CancellableThreads(); this.recoveryId = idGenerator.incrementAndGet(); @@ -103,6 +109,7 @@ public RecoveryTarget(IndexShard indexShard, this.indexShard = indexShard; this.sourceNode = sourceNode; this.snapshotFilesProvider = snapshotFilesProvider; + this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit; this.shardId = indexShard.shardId(); final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger, @@ -119,7 +126,7 @@ public RecoveryTarget(IndexShard indexShard, * @return a copy of this recovery target */ public RecoveryTarget retryCopy() { - return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener); + return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener); } @Nullable @@ -152,6 +159,10 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } + public boolean hasPermitToDownloadSnapshotFiles() { + return snapshotFileDownloadsPermit != null; + } + /** return the last time this RecoveryStatus was used (based on System.nanoTime() */ public long lastAccessTime() { if (recoveryMonitorEnabled) { @@ -289,6 +300,13 @@ protected void closeInternal() { store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); + releaseSnapshotFileDownloadsPermit(); + } + } + + private void releaseSnapshotFileDownloadsPermit() { + if (snapshotFileDownloadsPermit != null) { + snapshotFileDownloadsPermit.close(); } } @@ -512,6 +530,8 @@ public void restoreFileFromSnapshot(String repository, IndexId indexId, BlobStoreIndexShardSnapshot.FileInfo fileInfo, ActionListener listener) { + assert hasPermitToDownloadSnapshotFiles(); + try (InputStream inputStream = snapshotFilesProvider.getInputStreamForSnapshotFile(repository, indexId, shardId, fileInfo, this::registerThrottleTime)) { StoreFileMetadata metadata = fileInfo.metadata(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 12efe336a9ca5..f45cec9ddd149 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -32,6 +32,7 @@ public class StartRecoveryRequest extends TransportRequest { private Store.MetadataSnapshot metadataSnapshot; private boolean primaryRelocation; private long startingSeqNo; + private boolean canDownloadSnapshotFiles; public StartRecoveryRequest(StreamInput in) throws IOException { super(in); @@ -47,19 +48,25 @@ public StartRecoveryRequest(StreamInput in) throws IOException { } else { startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; } + if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) { + canDownloadSnapshotFiles = in.readBoolean(); + } else { + canDownloadSnapshotFiles = true; + } } /** * Construct a request for starting a peer recovery. * - * @param shardId the shard ID to recover - * @param targetAllocationId the allocation id of the target shard - * @param sourceNode the source node to remover from - * @param targetNode the target node to recover to - * @param metadataSnapshot the Lucene metadata - * @param primaryRelocation whether or not the recovery is a primary relocation - * @param recoveryId the recovery ID - * @param startingSeqNo the starting sequence number + * @param shardId the shard ID to recover + * @param targetAllocationId the allocation id of the target shard + * @param sourceNode the source node to remover from + * @param targetNode the target node to recover to + * @param metadataSnapshot the Lucene metadata + * @param primaryRelocation whether or not the recovery is a primary relocation + * @param recoveryId the recovery ID + * @param startingSeqNo the starting sequence number + * @param canDownloadSnapshotFiles flag that indicates if the snapshot files can be downloaded */ public StartRecoveryRequest(final ShardId shardId, final String targetAllocationId, @@ -68,7 +75,8 @@ public StartRecoveryRequest(final ShardId shardId, final Store.MetadataSnapshot metadataSnapshot, final boolean primaryRelocation, final long recoveryId, - final long startingSeqNo) { + final long startingSeqNo, + final boolean canDownloadSnapshotFiles) { this.recoveryId = recoveryId; this.shardId = shardId; this.targetAllocationId = targetAllocationId; @@ -77,6 +85,7 @@ public StartRecoveryRequest(final ShardId shardId, this.metadataSnapshot = metadataSnapshot; this.primaryRelocation = primaryRelocation; this.startingSeqNo = startingSeqNo; + this.canDownloadSnapshotFiles = canDownloadSnapshotFiles; assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null : "starting seq no is set but not history uuid"; } @@ -113,6 +122,10 @@ public long startingSeqNo() { return startingSeqNo; } + public boolean canDownloadSnapshotFiles() { + return canDownloadSnapshotFiles; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -126,6 +139,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { out.writeLong(startingSeqNo); } + + if (out.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) { + out.writeBoolean(canDownloadSnapshotFiles); + } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java index eb0273cf32fe6..c8954129eae19 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java @@ -16,7 +16,7 @@ import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cb98cc73ef785..dc41051196f5a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -117,7 +117,7 @@ public void run() { thread.start(); IndexShard replica = shards.addReplica(); Future future = shards.asyncRecoverReplica(replica, - (indexShard, node) -> new RecoveryTarget(indexShard, node, null, recoveryListener) { + (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener) { @@ -194,7 +194,7 @@ public IndexResult index(Index op) throws IOException { thread.start(); IndexShard replica = shards.addReplica(); Future fut = shards.asyncRecoverReplica(replica, - (shard, node) -> new RecoveryTarget(shard, node, null, recoveryListener) { + (shard, node) -> new RecoveryTarget(shard, node, null, null, recoveryListener) { @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { try { diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 81a14d87340ca..ce18decc1abbd 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -492,7 +492,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { AtomicBoolean recoveryDone = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); - return new RecoveryTarget(indexShard, node, null, recoveryListener) { + return new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { recoveryDone.set(true); @@ -547,7 +547,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { final IndexShard replica = shards.addReplica(); final Future recoveryFuture = shards.asyncRecoverReplica( replica, - (indexShard, node) -> new RecoveryTarget(indexShard, node, null, recoveryListener) { + (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -812,7 +812,7 @@ public static class BlockingTarget extends RecoveryTarget { public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) { - super(shard, sourceNode, null, listener); + super(shard, sourceNode, null, null, listener); this.recoveryBlocked = recoveryBlocked; this.releaseRecovery = releaseRecovery; this.stageToBlock = stageToBlock; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 56afd07cf955e..8eb25276a781d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2583,7 +2583,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2690,7 +2690,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException { // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2749,7 +2749,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); assertListenerCalled.accept(replica); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, null, recoveryListener) { + new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) { // we're only checking that listeners are called when the engine is open, before there is no point @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 1a91d467df12a..32272130bd617 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -40,7 +40,7 @@ public void testDuplicateRecoveries() throws IOException { mock(SnapshotsRecoveryPlannerService.class)); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.UNASSIGNED_SEQ_NO, true); peerRecoverySourceService.start(); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 695fe93075940..8433638ec8be7 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.NoOpEngine; @@ -47,6 +47,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.xcontent.XContentType; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -64,6 +65,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -97,7 +99,7 @@ public void testWriteFileChunksConcurrently() throws Exception { final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); - final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null, null); final PlainActionFuture receiveFileInfoFuture = new PlainActionFuture<>(); recoveryTarget.receiveFileInfo( mdFiles.stream().map(StoreFileMetadata::name).collect(Collectors.toList()), @@ -298,7 +300,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { shard.prepareForIndexRecovery(); long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint(); shard.store().markStoreCorrupted(new IOException("simulated")); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo); assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(request.metadataSnapshot().size(), equalTo(0)); @@ -325,7 +327,7 @@ public void testResetStartRequestIfTranslogIsCorrupted() throws Exception { shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); shard.prepareForIndexRecovery(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode, recoveryTarget, randomNonNegativeLong()); assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); @@ -386,7 +388,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); PlainActionFuture writeSnapshotFileFuture = PlainActionFuture.newFuture(); recoveryTarget.restoreFileFromSnapshot(repositoryName, indexId, fileInfo, writeSnapshotFileFuture); @@ -458,7 +460,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String repositoryName = "repo"; IndexId indexId = new IndexId("index", "uuid"); @@ -566,7 +568,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { } }; - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String[] fileNamesBeforeRecoveringSnapshotFiles = directory.listAll(); @@ -632,7 +634,7 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false); recoveryStateIndex.setFileDetailsComplete(); - RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, null); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null); String repository = "repo"; IndexId indexId = new IndexId("index", "uuid"); @@ -665,6 +667,29 @@ public int getReadSnapshotFileBufferSizeForRepo(String repository) { closeShards(shard); } + public void testSnapshotFileDownloadPermitIsReleasedAfterClosingRecoveryTarget() throws Exception { + DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + IndexShard shard = newShard(false); + shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); + shard.prepareForIndexRecovery(); + + AtomicBoolean snapshotFileDownloadsPermitFlag = new AtomicBoolean(); + Releasable snapshotFileDownloadsPermit = () -> { + assertThat(snapshotFileDownloadsPermitFlag.compareAndSet(false, true), is(equalTo(true))); + }; + RecoveryTarget recoveryTarget = + new RecoveryTarget(shard, null, null, snapshotFileDownloadsPermit, null); + + recoveryTarget.decRef(); + + assertThat(snapshotFileDownloadsPermitFlag.get(), is(equalTo(true))); + closeShards(shard); + } + private Tuple createStoreFileMetadataWithRandomContent(String fileName) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("test", "file", out, 1024)) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java new file mode 100644 index 0000000000000..559da2ad71815 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class RecoverySettingsTests extends ESTestCase { + public void testSnapshotDownloadPermitsAreNotGrantedWhenSnapshotsUseFlagIsFalse() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder() + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5) + .put(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.getKey(), false) + .build(), + clusterSettings + ); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + } + + public void testGrantsSnapshotDownloadPermitsUpToMaxPermits() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5).build(), + clusterSettings + ); + + Releasable permit = recoverySettings.tryAcquireSnapshotDownloadPermits(); + assertThat(permit, is(notNullValue())); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + + permit.close(); + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(notNullValue())); + } + + public void testSnapshotDownloadPermitCanBeDynamicallyUpdated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5).build(), + clusterSettings + ); + + Releasable permit = recoverySettings.tryAcquireSnapshotDownloadPermits(); + assertThat(permit, is(notNullValue())); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + clusterSettings.applySettings( + Settings.builder().put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 10).build() + ); + + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(notNullValue())); + assertThat(recoverySettings.tryAcquireSnapshotDownloadPermits(), is(nullValue())); + permit.close(); + } + + public void testMaxConcurrentSnapshotFileDownloadsPerNodeIsValidated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings settings = Settings.builder() + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), 10) + .put(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), 5) + .build(); + IllegalArgumentException exception = + expectThrows(IllegalArgumentException.class, () -> new RecoverySettings(settings, clusterSettings)); + assertThat(exception.getMessage(), + containsString("[indices.recovery.max_concurrent_snapshot_file_downloads_per_node]=5 " + + "is less than [indices.recovery.max_concurrent_snapshot_file_downloads]=10") + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index c10e838c10e64..03e773221b0bb 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -228,7 +228,9 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { randomBoolean(), randomNonNegativeLong(), randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(), + true + ); } public void testSendSnapshotSendsOps() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a37ea45802819..7158c2814d370 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -330,7 +330,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { IndexShard replicaShard = newShard(primaryShard.shardId(), false); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetadata()); recoverReplica(replicaShard, primaryShard, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener) { + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener) { @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { super.prepareForTranslogOperations(totalTranslogOps, listener); @@ -442,7 +442,7 @@ public long addDocument(Iterable doc) throws IOExcepti IndexShard replica = group.addReplica(); expectThrows(Exception.class, () -> group.recoverReplica(replica, (shard, sourceNode) -> { - return new RecoveryTarget(shard, sourceNode, null, new PeerRecoveryTargetService.RecoveryListener() { + return new RecoveryTarget(shard, sourceNode, null, null, new PeerRecoveryTargetService.RecoveryListener() { @Override public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { throw new AssertionError("recovery must fail"); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index 9e6e12c514de9..3fda777b665fd 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -44,7 +44,8 @@ public void testSerialization() throws Exception { randomBoolean(), randomNonNegativeLong(), randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(), + randomBoolean()); final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index dd6f13c8f2796..5cd177f658ede 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -150,6 +150,13 @@ long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, In final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId()); indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode)); indexShard.prepareForIndexRecovery(); - return collection.startRecovery(indexShard, sourceNode, null, listener, timeValue); + return collection.startRecovery( + indexShard, + sourceNode, + null, + listener, + timeValue, + null + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 04e760ed3dcb5..3ccb4404c8d47 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -400,7 +400,7 @@ public synchronized boolean removeReplica(IndexShard replica) throws IOException public void recoverReplica(IndexShard replica) throws IOException { recoverReplica(replica, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener)); + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener)); } public void recoverReplica(IndexShard replica, BiFunction targetSupplier) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ca41bda3e9d3e..27f79d2143b4b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -582,7 +582,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, recoveryListener), + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener), true, startReplica); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 96dbb74b2498f..c615eb0b808ef 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -373,7 +373,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { // We need to recover the replica async to release the main thread for the following task to fill missing // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for. recoveryFuture = group.asyncRecoverReplica(newReplica, - (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, null, recoveryListener) {}); + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, null, null, recoveryListener) {}); } } if (recoveryFuture != null) {