Skip to content

Commit

Permalink
don't pass executor around
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed May 18, 2021
1 parent fb55daa commit 24cf149
Showing 1 changed file with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -279,7 +278,6 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress,
// put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
// snapshot meta pool for a single request
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate);
final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> {
queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
Expand All @@ -300,15 +298,14 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress,
queue,
snapshotInfos,
task,
executor,
workerDoneListener
);
}
}

/**
* Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
* loads the snapshot info from the repository on the given {@code executor} and adds it to the given {@code snapshotInfos} collect,
* loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collect,
* then invokes itself again to try and poll another task from the queue.
* If the queue is empty resolves {@code} listener.
*/
Expand All @@ -317,14 +314,13 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable,
BlockingQueue<SnapshotId> queue,
Collection<SnapshotInfo> snapshotInfos,
CancellableTask task,
Executor executor,
ActionListener<Void> listener) {
final SnapshotId snapshotId = queue.poll();
if (snapshotId == null) {
listener.onResponse(null);
return;
}
executor.execute(() -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> {
if (task.isCancelled()) {
listener.onFailure(new TaskCancelledException("task cancelled"));
return;
Expand All @@ -342,7 +338,7 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable,
);
}
}
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, executor, listener);
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener);
});
}

Expand Down

0 comments on commit 24cf149

Please sign in to comment.