From 24cf149f94e6b68eac231b79cca4f5eda3f52fe4 Mon Sep 17 00:00:00 2001 From: Armin Braun <me@obrown.io> Date: Tue, 18 May 2021 13:23:07 +0200 Subject: [PATCH] don't pass executor around --- .../snapshots/get/TransportGetSnapshotsAction.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index d82613d3ebc17..e2e89b3d5fd42 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -279,7 +278,6 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the // snapshot meta pool for a single request final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size()); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate); final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> { queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response @@ -300,7 +298,6 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, queue, snapshotInfos, task, - executor, workerDoneListener ); } @@ -308,7 +305,7 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, /** * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue, - * loads the snapshot info from the repository on the given {@code executor} and adds it to the given {@code snapshotInfos} collect, + * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collect, * then invokes itself again to try and poll another task from the queue. * If the queue is empty resolves {@code} listener. */ @@ -317,14 +314,13 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable, BlockingQueue<SnapshotId> queue, Collection<SnapshotInfo> snapshotInfos, CancellableTask task, - Executor executor, ActionListener<Void> listener) { final SnapshotId snapshotId = queue.poll(); if (snapshotId == null) { listener.onResponse(null); return; } - executor.execute(() -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> { if (task.isCancelled()) { listener.onFailure(new TaskCancelledException("task cancelled")); return; @@ -342,7 +338,7 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable, ); } } - getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, executor, listener); + getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener); }); }