Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Limit concurrent snapshot file restores in recovery per node #79379

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of snapshot file downloads requests
execyted in parallel in the target node for all recoveries. Defaults to `25`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.common.util.concurrent.RunOnce;
Expand All @@ -30,8 +31,6 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand Down Expand Up @@ -136,30 +135,4 @@ private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
}
return new UncategorizedExecutionException("Failed execution", root);
}

static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import java.util.concurrent.Semaphore;

public class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

public AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

public void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,16 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}

public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits();
// create a new recovery status, and process...
final long recoveryId =
onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout());
final long recoveryId = onGoingRecoveries.startRecovery(
indexShard,
sourceNode,
snapshotFilesProvider,
listener,
recoverySettings.activityTimeout(),
snapshotFileDownloadsPermit
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
Expand Down Expand Up @@ -268,7 +275,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
startingSeqNo,
recoveryTarget.hasPermitToDownloadSnapshotFiles()
);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
PeerRecoveryTargetService.RecoveryListener listener,
TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
TimeValue activityTimeout,
@Nullable Releasable snapshotFileDownloadsPermit) {
RecoveryTarget recoveryTarget =
new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.jdk.JavaVersion;
import org.elasticsearch.monitor.os.OsProbe;
import org.elasticsearch.node.NodeRoleSettings;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.common.settings.Setting.parseInt;

public class RecoverySettings {
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_7_16_0;

private static final Logger logger = LogManager.getLogger(RecoverySettings.class);

Expand Down Expand Up @@ -134,7 +146,7 @@ public class RecoverySettings {

/**
* recoveries would try to use files from available snapshots instead of sending them from the source node.
* defaults to `false`
* defaults to `true`
*/
public static final Setting<Boolean> INDICES_RECOVERY_USE_SNAPSHOTS_SETTING =
Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope);
Expand All @@ -148,6 +160,43 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE = new Setting<>(
"indices.recovery.max_concurrent_snapshot_file_downloads_per_node",
"25",
(s) -> parseInt(s, 1, 25, "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", false),
new Setting.Validator<Integer>() {
private final Collection<Setting<?>> dependencies =
Collections.singletonList(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
@Override
public void validate(Integer value) {
// ignore
}

@Override
public void validate(Integer maxConcurrentSnapshotFileDownloadsPerNode, Map<Setting<?>, Object> settings) {
int maxConcurrentSnapshotFileDownloads = (int) settings.get(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
if (maxConcurrentSnapshotFileDownloadsPerNode < maxConcurrentSnapshotFileDownloads) {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"[%s]=%d is less than [%s]=%d",
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(),
maxConcurrentSnapshotFileDownloadsPerNode,
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
maxConcurrentSnapshotFileDownloads
)
);
}
}

@Override
public Iterator<Setting<?>> settings() {
return dependencies.iterator();
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
Expand All @@ -162,6 +211,9 @@ public class RecoverySettings {
private volatile TimeValue internalActionLongTimeout;
private volatile boolean useSnapshotsDuringRecovery;
private volatile int maxConcurrentSnapshotFileDownloads;
private volatile int maxConcurrentSnapshotFileDownloadsPerNode;

private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand All @@ -186,6 +238,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
}
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true);

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

Expand All @@ -202,6 +256,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
this::setMaxConcurrentSnapshotFileDownloads);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
this::setMaxConcurrentSnapshotFileDownloadsPerNode);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -303,4 +359,30 @@ public int getMaxConcurrentSnapshotFileDownloads() {
public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) {
this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads;
}

private void setMaxConcurrentSnapshotFileDownloadsPerNode(int maxConcurrentSnapshotFileDownloadsPerNode) {
this.maxConcurrentSnapshotFileDownloadsPerNode = maxConcurrentSnapshotFileDownloadsPerNode;
this.maxSnapshotFileDownloadsPerNodeSemaphore.setMaxPermits(maxConcurrentSnapshotFileDownloadsPerNode);
}

@Nullable
Releasable tryAcquireSnapshotDownloadPermits() {
final int maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads();
final boolean permitAcquired = maxSnapshotFileDownloadsPerNodeSemaphore.tryAcquire(maxConcurrentSnapshotFileDownloads);
if (getUseSnapshotsDuringRecovery() == false || permitAcquired == false) {
if (permitAcquired == false) {
logger.warn(String.format(Locale.ROOT,
"Unable to acquire permit to use snapshot files during recovery, " +
"this recovery will recover index files from the source node. " +
"Ensure snapshot files can be used during recovery by setting [%s] to be no greater than [%d]",
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
this.maxConcurrentSnapshotFileDownloadsPerNode
)
);
}
return null;
}

return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,15 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
}
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
cancellableThreads.checkForCancel();
final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles();
recoveryPlannerService.computeRecoveryPlan(shard.shardId(),
shardStateIdentifier,
recoverySourceMetadata,
request.metadataSnapshot(),
startingSeqNo,
translogOps.getAsInt(),
getRequest().targetNode().getVersion(),
useSnapshots,
canUseSnapshots,
ActionListener.wrap(plan ->
recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final SnapshotFilesProvider snapshotFilesProvider;
@Nullable // if we're not downloading files from snapshots in this recovery
private final Releasable snapshotFileDownloadsPermit;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
Expand All @@ -90,11 +92,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot,
* limiting the concurrent snapshot file downloads per node
* preventing the exhaustion of repository resources.
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
@Nullable Releasable snapshotFileDownloadsPermit,
PeerRecoveryTargetService.RecoveryListener listener) {
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
Expand All @@ -103,6 +109,7 @@ public RecoveryTarget(IndexShard indexShard,
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.snapshotFilesProvider = snapshotFilesProvider;
this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit;
this.shardId = indexShard.shardId();
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
Expand All @@ -119,7 +126,7 @@ public RecoveryTarget(IndexShard indexShard,
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
}

@Nullable
Expand Down Expand Up @@ -152,6 +159,10 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public boolean hasPermitToDownloadSnapshotFiles() {
return snapshotFileDownloadsPermit != null;
}

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
if (recoveryMonitorEnabled) {
Expand Down Expand Up @@ -289,6 +300,13 @@ protected void closeInternal() {
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
releaseSnapshotFileDownloadsPermit();
}
}

private void releaseSnapshotFileDownloadsPermit() {
if (snapshotFileDownloadsPermit != null) {
snapshotFileDownloadsPermit.close();
}
}

Expand Down Expand Up @@ -512,6 +530,8 @@ public void restoreFileFromSnapshot(String repository,
IndexId indexId,
BlobStoreIndexShardSnapshot.FileInfo fileInfo,
ActionListener<Void> listener) {
assert hasPermitToDownloadSnapshotFiles();

try (InputStream inputStream =
snapshotFilesProvider.getInputStreamForSnapshotFile(repository, indexId, shardId, fileInfo, this::registerThrottleTime)) {
StoreFileMetadata metadata = fileInfo.metadata();
Expand Down
Loading