From b3eb52d4e231d565deff78ce23a28e0beef0e2a1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 22 Jul 2022 14:39:21 +0100 Subject: [PATCH] Avoid capturing SnapshotsInProgress$Entry in queue (#88726) Today each time there's shards to snapshot we enqueue a lambda which captures the current `SnapshotsInProgress$Entry`. This is a pretty heavyweight object, possibly several MB in size, most of which is not necessary to capture, and with concurrent snapshots across thousands of shards we may enqueue many hundreds of slightly different such objects. With this commit we compute a more efficient representation of the work to be done by each task in the queue instead. Relates #77466 Backport of #88707 --- docs/changelog/88707.yaml | 5 + .../snapshots/SnapshotShardsService.java | 122 ++++++++++-------- 2 files changed, 72 insertions(+), 55 deletions(-) create mode 100644 docs/changelog/88707.yaml diff --git a/docs/changelog/88707.yaml b/docs/changelog/88707.yaml new file mode 100644 index 0000000000000..b40fc0d878471 --- /dev/null +++ b/docs/changelog/88707.yaml @@ -0,0 +1,5 @@ +pr: 88707 +summary: Avoid capturing `SnapshotsInProgress$Entry` in queue +area: Snapshot/Restore +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index ddba386067602..2cb2d231f3597 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -228,7 +229,24 @@ private void startNewSnapshots(List snapshotsInProgre } if (startedShards != null && startedShards.isEmpty() == false) { shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); - startNewShards(entry, startedShards); + + final List shardSnapshotTasks = new ArrayList<>(startedShards.size()); + for (final Map.Entry shardEntry : startedShards.entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue(); + final IndexId indexId = entry.indices().get(shardId.getIndexName()); + assert indexId != null; + assert SnapshotsService.useShardGenerations(entry.version()) + || ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null + : "Found non-null, non-numeric shard generation [" + + snapshotStatus.generation() + + "] for snapshot with old-format compatibility"; + shardSnapshotTasks.add( + newShardSnapshotTask(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version()) + ); + } + + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run)); } } else if (entryState == State.ABORTED) { // Abort all running shards for this snapshot @@ -251,63 +269,57 @@ private void startNewSnapshots(List snapshotsInProgre } } - private void startNewShards(SnapshotsInProgress.Entry entry, Map startedShards) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - final Snapshot snapshot = entry.snapshot(); - for (final Map.Entry shardEntry : startedShards.entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue(); - final IndexId indexId = entry.indices().get(shardId.getIndexName()); - assert indexId != null; - assert SnapshotsService.useShardGenerations(entry.version()) - || ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null - : "Found non-null, non-numeric shard generation [" - + snapshotStatus.generation() - + "] for snapshot with old-format compatibility"; - snapshot( - shardId, - snapshot, - indexId, - entry.userMetadata(), - snapshotStatus, - entry.version(), - new ActionListener() { - @Override - public void onResponse(ShardSnapshotResult shardSnapshotResult) { - final ShardGeneration newGeneration = shardSnapshotResult.getGeneration(); - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "[{}][{}] completed snapshot to [{}] with status [{}] at generation [{}]", - shardId, - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); - } - notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult); - } + private Runnable newShardSnapshotTask( + final ShardId shardId, + final Snapshot snapshot, + final IndexId indexId, + final Map userMetadata, + final IndexShardSnapshotStatus snapshotStatus, + final Version entryVersion + ) { + // separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry + return () -> snapshot( + shardId, + snapshot, + indexId, + userMetadata, + snapshotStatus, + entryVersion, + new ActionListener() { + @Override + public void onResponse(ShardSnapshotResult shardSnapshotResult) { + final ShardGeneration newGeneration = shardSnapshotResult.getGeneration(); + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "[{}][{}] completed snapshot to [{}] with status [{}] at generation [{}]", + shardId, + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); + } + notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult); + } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation()); - } + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); } - ); + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation()); + } } - }); + ); } // package private for testing