Skip to content

Commit

Permalink
Collapse nested listeners in get-snapshots action (elastic#111028)
Browse files Browse the repository at this point in the history
There's no need for two layers of `RefCountingListener` any more, we
propagate all failures to the top level in any case, so we can run
everything under a single listener.
  • Loading branch information
DaveCTurner authored Jul 18, 2024
1 parent 43cdda4 commit 764894f
Showing 1 changed file with 64 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected void masterOperation(
SnapshotsInProgress.get(state),
request.verbose(),
request.includeIndexNames()
).getMultipleReposSnapshotInfo(listener);
).runOperation(listener);
}

/**
Expand Down Expand Up @@ -256,36 +256,11 @@ private class GetSnapshotsOperation {
}
}

void getMultipleReposSnapshotInfo(ActionListener<GetSnapshotsResponse> listener) {
SubscribableListener

.<Void>newForked(repositoriesDoneListener -> {
try (var listeners = new RefCountingListener(repositoriesDoneListener)) {
for (final RepositoryMetadata repository : repositories) {
final String repoName = repository.name();
if (skipRepository(repoName)) {
continue;
}

if (listeners.isFailing()) {
return;
}

SubscribableListener.<RepositoryData>newForked(l -> maybeGetRepositoryData(repoName, l))
.<Void>andThen((repositoryListener, repositoryData) -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
cancellableTask.ensureNotCancelled();
ensureRequiredNamesPresent(repoName, repositoryData);
loadSnapshotInfos(
getAsyncSnapshotInfoIterator(repositoriesService.repository(repoName), repositoryData),
repositoryListener
);
})
.addListener(listeners.acquire());
}
}
})

/**
* Run the get-snapshots operation and compute the response.
*/
void runOperation(ActionListener<GetSnapshotsResponse> listener) {
SubscribableListener.newForked(this::populateResults)
.addListener(
listener.map(ignored -> buildResponse()),
// If we didn't load any SnapshotInfo blobs from the repo (e.g. verbose=false or current-snapshots-only) then this
Expand All @@ -296,6 +271,64 @@ void getMultipleReposSnapshotInfo(ActionListener<GetSnapshotsResponse> listener)
);
}

/**
* Populate the results fields ({@link #allSnapshotInfos} and {@link #totalCount}).
*/
private void populateResults(ActionListener<Void> listener) {
try (var listeners = new RefCountingListener(listener)) {
for (final RepositoryMetadata repository : repositories) {
final String repositoryName = repository.name();
if (skipRepository(repositoryName)) {
continue;
}

if (listeners.isFailing()) {
return;
}

maybeGetRepositoryData(repositoryName, listeners.acquire(repositoryData -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
cancellableTask.ensureNotCancelled();
ensureRequiredNamesPresent(repositoryName, repositoryData);
ThrottledIterator.run(
Iterators.failFast(
getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData),
() -> cancellableTask.isCancelled() || listeners.isFailing()
),
(ref, asyncSnapshotInfo) -> ActionListener.run(
ActionListener.runBefore(listeners.acquire(), ref::close),
refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
totalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
refListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (ignoreUnavailable) {
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
refListener.onResponse(null);
} else {
refListener.onFailure(e);
}
}
})
),
getSnapshotInfoExecutor.getMaxRunningTasks(),
() -> {},
() -> {}
);
}));
}
}
}

private void maybeGetRepositoryData(String repositoryName, ActionListener<RepositoryData> listener) {
if (snapshotNamePredicate == SnapshotNamePredicate.MATCH_CURRENT_ONLY) {
listener.onResponse(null);
Expand Down Expand Up @@ -464,45 +497,6 @@ private Map<SnapshotId, List<String>> getIndicesLookup(RepositoryData repository
return snapshotsToIndices;
}

private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIterator, ActionListener<Void> listener) {
if (cancellableTask.notifyIfCancelled(listener)) {
return;
}
try (var listeners = new RefCountingListener(listener)) {
ThrottledIterator.run(
Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()),
(ref, asyncSnapshotInfo) -> {
final var refListener = ActionListener.runBefore(listeners.acquire(), ref::close);
asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
totalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
refListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (ignoreUnavailable) {
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
refListener.onResponse(null);
} else {
refListener.onFailure(e);
}
}
});
},
getSnapshotInfoExecutor.getMaxRunningTasks(),
() -> {},
() -> {}
);
}
}

private GetSnapshotsResponse buildResponse() {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); // see [NOTE ON THREADING]
cancellableTask.ensureNotCancelled();
Expand Down

0 comments on commit 764894f

Please sign in to comment.