Skip to content

Commit

Permalink
Remove Needless Context Switches on Loading RepositoryData (#56935)
Browse files Browse the repository at this point in the history
We don't need to switch to the generic or snapshot pool for loading
cached repository data (i.e. most of the time in normal operation).

This makes `executeConsistentStateUpdate` less heavy if it has to retry
and lowers the chance of having to retry in the first place.
Also, this change allowed simplifying a few other spots in the codebase
where we would fork off to another pool just to load repository data.
  • Loading branch information
original-brownbear authored May 25, 2020
1 parent 6f60f1e commit 444e1e1
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData =
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
final RepositoryData repositoryData = getRepositoryData(repository);
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down Expand Up @@ -108,7 +107,6 @@
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -666,10 +664,8 @@ public void testSnapshotRecovery() throws Exception {
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME);
final RepositoryData repositoryData = PlainActionFuture.get(f ->
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData)));
final RepositoryData repositoryData = PlainActionFuture.get(repository::getRepositoryData);
for (Map.Entry<String, List<RecoveryState>> indexRecoveryStates : response.shardRecoveryStates().entrySet()) {

assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA

@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
return ThreadPool.Names.SAME;
}

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,16 @@ private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshot
return new GetSnapshotsResponse(responses);
}), repos.size());

// run concurrently for all repos on GENERIC thread pool
for (final RepositoryMetadata repo : repos) {
final String repoName = repo.name();
threadPool.generic().execute(ActionRunnable.wrap(
ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> {
if (e instanceof ElasticsearchException) {
groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e));
} else {
groupedListener.onFailure(e);
}
}), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose, ActionListener.map(
ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> {
if (e instanceof ElasticsearchException) {
groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e));
} else {
groupedListener.onFailure(e);
}
}), snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)));
}
}

Expand All @@ -154,9 +152,8 @@ private void getSingleRepoSnapshotInfo(@Nullable SnapshotsInProgress snapshotsIn
repositoriesService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
repositoryDataListener.whenComplete(repositoryData -> loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData, listener), listener::onFailure);
}

/**
Expand All @@ -178,9 +175,10 @@ private static List<SnapshotInfo> sortedCurrentSnapshots(@Nullable SnapshotsInPr
}


private List<SnapshotInfo> loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData) {
private void loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData,
ActionListener<List<SnapshotInfo>> listener) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
Expand Down Expand Up @@ -214,10 +212,11 @@ private List<SnapshotInfo> loadSnapshotInfos(@Nullable SnapshotsInProgress snaps
}
}

final List<SnapshotInfo> snapshotInfos;
if (verbose) {
snapshotInfos = snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
threadPool.generic().execute(ActionRunnable.supply(
listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable)));
} else {
final List<SnapshotInfo> snapshotInfos;
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
Expand All @@ -226,9 +225,8 @@ private List<SnapshotInfo> loadSnapshotInfos(@Nullable SnapshotsInProgress snaps
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
listener.onResponse(snapshotInfos);
}

return snapshotInfos;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,62 +331,52 @@ protected void doClose() {
@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() {
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {

private boolean executedTask = false;
private boolean executedTask = false;

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
// If the safe generation has changed, then we have to reload repository data and start over.
// If the pending generation has changed we are in the midst of a write operation and might pick up the
// updated repository data and state on the retry. We don't want to wait for the write to finish though
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
// to change in any form.
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
executedTask = true;
return updateTask.execute(currentState);
}
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
if (executedTask) {
updateTask.onFailure(source, e);
} else {
onFailure.accept(e);
}
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
// If the safe generation has changed, then we have to reload repository data and start over.
// If the pending generation has changed we are in the midst of a write operation and might pick up the
// updated repository data and state on the retry. We don't want to wait for the write to finish though
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
// to change in any form.
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
executedTask = true;
return updateTask.execute(currentState);
}
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (executedTask) {
updateTask.clusterStateProcessed(source, oldState, newState);
} else {
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
}
@Override
public void onFailure(String source, Exception e) {
if (executedTask) {
updateTask.onFailure(source, e);
} else {
onFailure.accept(e);
}
}

@Override
public TimeValue timeout() {
return updateTask.timeout();
}
});
}, onFailure));
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (executedTask) {
updateTask.clusterStateProcessed(source, oldState, newState);
} else {
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
});
@Override
public TimeValue timeout() {
return updateTask.timeout();
}
});
}, onFailure));
}

// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
Expand Down Expand Up @@ -552,17 +542,23 @@ public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryS
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
try {
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, ex));
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e));
}
});
}
}

Expand Down Expand Up @@ -1168,6 +1164,22 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
try {
listener.onResponse(repositoryDataFromCachedEntry(cached));
} catch (Exception e) {
listener.onFailure(e);
}
return;
}
// Slow path if we were not able to safely read the repository data from cache
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
}

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
// generation repeatedly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -1166,17 +1165,16 @@ public static boolean useShardGenerations(Version repositoryMetaVersion) {
*/
private void deleteSnapshotsFromRepository(String repoName, Collection<SnapshotId> snapshotIds, @Nullable ActionListener<Void> listener,
long repositoryStateId, Version minNodeVersion) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Repository repository = repositoriesService.repository(repoName);
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds,
Repository repository = repositoriesService.repository(repoName);
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(
snapshotIds,
repositoryStateId,
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
ActionListener.wrap(v -> {
logger.info("snapshots {} deleted", snapshotIds);
removeSnapshotDeletionFromClusterState(snapshotIds, null, l);
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)));
}));
logger.info("snapshots {} deleted", snapshotIds);
removeSnapshotDeletionFromClusterState(snapshotIds, null, listener);
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener)
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener)));
}

/**
Expand Down
Loading

0 comments on commit 444e1e1

Please sign in to comment.