Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SNAPSHOT_META Threadpool for Fetching Repository Metadata #73172

5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ There are several thread pools, but the important ones include:
keep-alive of `5m` and a max of `min(5, (`<<node.processors,
`# of allocated processors`>>`) / 2)`.

`snapshot_meta`::
For snapshot repository metadata read operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a max of `min(50, (`<<node.processors,
`# of allocated processors`>>` pass:[ * ]3))`.

`warmer`::
For segment warm-up operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a max of `min(5, (`<<node.processors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -34,6 +33,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand All @@ -46,12 +46,15 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -211,8 +214,7 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
}

if (verbose) {
threadPool.generic().execute(ActionRunnable.supply(
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable, task)));
snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, listener);
} else {
final List<SnapshotInfo> snapshotInfos;
if (repositoryData != null) {
Expand All @@ -235,12 +237,16 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @return list of snapshots
*/
private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName,
List<SnapshotId> snapshotIds, boolean ignoreUnavailable, CancellableTask task) {
private void snapshots(SnapshotsInProgress snapshotsInProgress,
String repositoryName,
Collection<SnapshotId> snapshotIds,
boolean ignoreUnavailable,
CancellableTask task,
ActionListener<List<SnapshotInfo>> listener) {
if (task.isCancelled()) {
throw new TaskCancelledException("task cancelled");
listener.onFailure(new TaskCancelledException("task cancelled"));
return;
}
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
Expand All @@ -252,28 +258,88 @@ private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, St
snapshotSet.add(new SnapshotInfo(entry));
}
}
// then, look in the repository
final Repository repository = repositoriesService.repository(repositoryName);
for (SnapshotId snapshotId : snapshotIdsToIterate) {
// then, look in the repository if there's any matching snapshots left
final List<SnapshotInfo> snapshotInfos;
if (snapshotIdsToIterate.isEmpty()) {
snapshotInfos = Collections.emptyList();
} else {
snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}
final ActionListener<Collection<Void>> allDoneListener = listener.delegateFailure((l, v) -> {
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotInfos);
snapshotList.addAll(snapshotSet);
CollectionUtil.timSort(snapshotList);
listener.onResponse(unmodifiableList(snapshotList));
});
if (snapshotIdsToIterate.isEmpty()) {
allDoneListener.onResponse(Collections.emptyList());
return;
}
// put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
// snapshot meta pool for a single request
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size());
final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate);
final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> {
queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
// anyway in this case
l.onFailure(e);
});
final Repository repository;
try {
repository = repositoriesService.repository(repositoryName);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
return;
}
for (int i = 0; i < workers; i++) {
getOneSnapshotInfo(
ignoreUnavailable,
repository,
queue,
snapshotInfos,
task,
workerDoneListener
);
}
}

/**
* Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
* loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to
* try and poll another task from the queue.
* If the queue is empty resolves {@code} listener.
*/
private void getOneSnapshotInfo(boolean ignoreUnavailable,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a bit of Javadoc?

Repository repository,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should retrieve the Repository from the the RepositoriesService for each SnapshotInfo to load, so that if the repository is gone the RepositoryMissing is easier to propagate through listeners (and grouped listener which clears the queue etc). Otherwise a RepositoryMissing might be thrown I think and will be caught at a higher level but we keep fetching snapshot info here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could I guess but it's not going to be a big cleanup/win since this situation is somewhat broken to begin with.
The behavior of the repository after close isn't well defined currently. Depending on the repo implementation the requests can either start failing or in case of FsRepository will just keep going actually because close is a noop there.
Might be worth just fixing that in general at some point?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I still think it is worth not mixing thrown exceptions and listeners here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh 🤦 now I get your comment. Sorry, I completely misread it for no good reason :( => Fix coming right up.

Copy link
Member Author

@original-brownbear original-brownbear May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed fb55daa to address this (and random formatting noise) now :) I went with this instead of looking up the repo in the loop, because the latter would be caught and suppressed by ignoreUnvailable which I found weird (albeit practically irrelevant).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb55daa looks good, thanks! And sorry if I wasn't clear at first :)

BlockingQueue<SnapshotId> queue,
Collection<SnapshotInfo> snapshotInfos,
CancellableTask task,
ActionListener<Void> listener) {
final SnapshotId snapshotId = queue.poll();
if (snapshotId == null) {
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> {
if (task.isCancelled()) {
throw new TaskCancelledException("task cancelled");
listener.onFailure(new TaskCancelledException("task cancelled"));
return;
}
try {
snapshotSet.add(repository.getSnapshotInfo(snapshotId));
snapshotInfos.add(repository.getSnapshotInfo(snapshotId));
} catch (Exception ex) {
if (ignoreUnavailable) {
logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
} else {
if (ex instanceof SnapshotException) {
throw ex;
}
throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex);
listener.onFailure(
ex instanceof SnapshotException
? ex
: new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex)
);
}
}
}
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotSet);
CollectionUtil.timSort(snapshotList);
return unmodifiableList(snapshotList);
getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener);
});
}

private boolean isAllSnapshots(String[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ public long getRestoreThrottleTimeInNanos() {

protected void assertSnapshotOrGenericThread() {
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_META + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') :
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
}
Expand Down Expand Up @@ -1428,11 +1429,12 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static class Names {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String SNAPSHOT_META = "snapshot_meta";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
Expand Down Expand Up @@ -116,6 +117,7 @@ public static ThreadPoolType fromType(String type) {
entry(Names.REFRESH, ThreadPoolType.SCALING),
entry(Names.WARMER, ThreadPoolType.SCALING),
entry(Names.SNAPSHOT, ThreadPoolType.SCALING),
entry(Names.SNAPSHOT_META, ThreadPoolType.SCALING),
entry(Names.FORCE_MERGE, ThreadPoolType.FIXED),
entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING),
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
Expand Down Expand Up @@ -189,6 +191,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, Math.min(allocatedProcessors * 3, 50),
TimeValue.timeValueSeconds(30L)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
keepAlive = randomIntBetween(1, 300);
builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s");
} else {
keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
keepAlive = "generic".equals(threadPoolName) || ThreadPool.Names.SNAPSHOT_META.equals(threadPoolName)
? 30 : 300; // the defaults
}

runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {
Expand Down Expand Up @@ -96,6 +97,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT_META, n -> Math.min(n * 3, 50));
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
return sizes.get(threadPoolName).apply(numberOfProcessors);
Expand Down