Skip to content

Commit

Permalink
Limit concurrent snapshot file restores in recovery per node (#79316)
Browse files Browse the repository at this point in the history
Today we limit the max number of concurrent snapshot file restores
per recovery. This works well when the default
node_concurrent_recoveries is used (which is 2). When this limit is
increased, it is possible to exhaust the underlying repository
connection pool, affecting other workloads.

This commit adds a new setting
`indices.recovery.max_concurrent_snapshot_file_downloads_per_node` that
allows to limit the max number of snapshot file downloads per node
during recoveries. When a recovery starts in the target node it tries
to acquire a permit that allows it to download snapshot files when it is
granted. This is communicated to the source node in the
StartRecoveryRequest. This is a rather conservative approach since it is
possible that a recovery that gets a permit to use snapshot files
doesn't recover any snapshot file while there's a concurrent recovery
that doesn't get a permit could take advantage of recovering from a
snapshot.

Closes #79044
  • Loading branch information
fcofdez authored Oct 18, 2021
1 parent 444d7ca commit 2b4fe8f
Show file tree
Hide file tree
Showing 25 changed files with 612 additions and 74 deletions.
7 changes: 7 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of snapshot file downloads requests
execyted in parallel in the target node for all recoveries. Defaults to `25`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand Down Expand Up @@ -106,30 +105,4 @@ protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListe
client.execute(AutoPutMappingAction.INSTANCE, putMappingRequest,
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
}

static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import java.util.concurrent.Semaphore;

public class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

public AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

public void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,16 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}

public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits();
// create a new recovery status, and process...
final long recoveryId =
onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout());
final long recoveryId = onGoingRecoveries.startRecovery(
indexShard,
sourceNode,
snapshotFilesProvider,
listener,
recoverySettings.activityTimeout(),
snapshotFileDownloadsPermit
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
Expand Down Expand Up @@ -267,7 +274,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
startingSeqNo,
recoveryTarget.hasPermitToDownloadSnapshotFiles()
);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
PeerRecoveryTargetService.RecoveryListener listener,
TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
TimeValue activityTimeout,
@Nullable Releasable snapshotFileDownloadsPermit) {
RecoveryTarget recoveryTarget =
new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.jdk.JavaVersion;
import org.elasticsearch.monitor.os.OsProbe;
import org.elasticsearch.node.NodeRoleSettings;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.common.settings.Setting.parseInt;

public class RecoverySettings {
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_8_0_0;

private static final Logger logger = LogManager.getLogger(RecoverySettings.class);

Expand Down Expand Up @@ -134,7 +146,7 @@ public class RecoverySettings {

/**
* recoveries would try to use files from available snapshots instead of sending them from the source node.
* defaults to `false`
* defaults to `true`
*/
public static final Setting<Boolean> INDICES_RECOVERY_USE_SNAPSHOTS_SETTING =
Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope);
Expand All @@ -148,6 +160,43 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE = new Setting<>(
"indices.recovery.max_concurrent_snapshot_file_downloads_per_node",
"25",
(s) -> parseInt(s, 1, 25, "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", false),
new Setting.Validator<>() {
private final Collection<Setting<?>> dependencies =
Collections.singletonList(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
@Override
public void validate(Integer value) {
// ignore
}

@Override
public void validate(Integer maxConcurrentSnapshotFileDownloadsPerNode, Map<Setting<?>, Object> settings) {
int maxConcurrentSnapshotFileDownloads = (int) settings.get(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
if (maxConcurrentSnapshotFileDownloadsPerNode < maxConcurrentSnapshotFileDownloads) {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"[%s]=%d is less than [%s]=%d",
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(),
maxConcurrentSnapshotFileDownloadsPerNode,
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
maxConcurrentSnapshotFileDownloads
)
);
}
}

@Override
public Iterator<Setting<?>> settings() {
return dependencies.iterator();
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
Expand All @@ -162,6 +211,9 @@ public class RecoverySettings {
private volatile TimeValue internalActionLongTimeout;
private volatile boolean useSnapshotsDuringRecovery;
private volatile int maxConcurrentSnapshotFileDownloads;
private volatile int maxConcurrentSnapshotFileDownloadsPerNode;

private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand All @@ -186,6 +238,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
}
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true);

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

Expand All @@ -202,6 +256,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
this::setMaxConcurrentSnapshotFileDownloads);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
this::setMaxConcurrentSnapshotFileDownloadsPerNode);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -303,4 +359,30 @@ public int getMaxConcurrentSnapshotFileDownloads() {
public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) {
this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads;
}

private void setMaxConcurrentSnapshotFileDownloadsPerNode(int maxConcurrentSnapshotFileDownloadsPerNode) {
this.maxConcurrentSnapshotFileDownloadsPerNode = maxConcurrentSnapshotFileDownloadsPerNode;
this.maxSnapshotFileDownloadsPerNodeSemaphore.setMaxPermits(maxConcurrentSnapshotFileDownloadsPerNode);
}

@Nullable
Releasable tryAcquireSnapshotDownloadPermits() {
final int maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads();
final boolean permitAcquired = maxSnapshotFileDownloadsPerNodeSemaphore.tryAcquire(maxConcurrentSnapshotFileDownloads);
if (getUseSnapshotsDuringRecovery() == false || permitAcquired == false) {
if (permitAcquired == false) {
logger.warn(String.format(Locale.ROOT,
"Unable to acquire permit to use snapshot files during recovery, " +
"this recovery will recover index files from the source node. " +
"Ensure snapshot files can be used during recovery by setting [%s] to be no greater than [%d]",
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
this.maxConcurrentSnapshotFileDownloadsPerNode
)
);
}
return null;
}

return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,15 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
}
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
cancellableThreads.checkForCancel();
final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles();
recoveryPlannerService.computeRecoveryPlan(shard.shardId(),
shardStateIdentifier,
recoverySourceMetadata,
request.metadataSnapshot(),
startingSeqNo,
translogOps.getAsInt(),
getRequest().targetNode().getVersion(),
useSnapshots,
canUseSnapshots,
ActionListener.wrap(plan ->
recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final SnapshotFilesProvider snapshotFilesProvider;
@Nullable // if we're not downloading files from snapshots in this recovery
private final Releasable snapshotFileDownloadsPermit;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
Expand All @@ -89,11 +91,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot,
* limiting the concurrent snapshot file downloads per node
* preventing the exhaustion of repository resources.
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
@Nullable Releasable snapshotFileDownloadsPermit,
PeerRecoveryTargetService.RecoveryListener listener) {
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
Expand All @@ -102,6 +108,7 @@ public RecoveryTarget(IndexShard indexShard,
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.snapshotFilesProvider = snapshotFilesProvider;
this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit;
this.shardId = indexShard.shardId();
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
Expand All @@ -118,7 +125,7 @@ public RecoveryTarget(IndexShard indexShard,
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
}

@Nullable
Expand Down Expand Up @@ -151,6 +158,10 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public boolean hasPermitToDownloadSnapshotFiles() {
return snapshotFileDownloadsPermit != null;
}

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
if (recoveryMonitorEnabled) {
Expand Down Expand Up @@ -288,6 +299,13 @@ protected void closeInternal() {
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
releaseSnapshotFileDownloadsPermit();
}
}

private void releaseSnapshotFileDownloadsPermit() {
if (snapshotFileDownloadsPermit != null) {
snapshotFileDownloadsPermit.close();
}
}

Expand Down Expand Up @@ -506,6 +524,8 @@ public void restoreFileFromSnapshot(String repository,
IndexId indexId,
BlobStoreIndexShardSnapshot.FileInfo fileInfo,
ActionListener<Void> listener) {
assert hasPermitToDownloadSnapshotFiles();

try (InputStream inputStream =
snapshotFilesProvider.getInputStreamForSnapshotFile(repository, indexId, shardId, fileInfo, this::registerThrottleTime)) {
StoreFileMetadata metadata = fileInfo.metadata();
Expand Down
Loading

0 comments on commit 2b4fe8f

Please sign in to comment.