diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 159ae348ac102..7cae6ba75fce7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -156,8 +156,7 @@ public void testEnforcedCooldownPeriod() throws IOException { .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); - final RepositoryData repositoryData = - PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData repositoryData = getRepositoryData(repository); final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); final BytesReference serialized = diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 9bbc1e2eeea5d..3b96bf0d5bd4e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -23,7 +23,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -108,7 +107,6 @@ import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -666,10 +664,8 @@ public void testSnapshotRecovery() throws Exception { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME); - final RepositoryData repositoryData = PlainActionFuture.get(f -> - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData))); + final RepositoryData repositoryData = PlainActionFuture.get(repository::getRepositoryData); for (Map.Entry> indexRecoveryStates : response.shardRecoveryStates().entrySet()) { assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 0209d77a8d412..4a751e1ab9931 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -78,7 +78,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA @Override protected String executor() { - return ThreadPool.Names.GENERIC; + return ThreadPool.Names.SAME; } @Inject diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 373563f7b7c79..6c7a8a256ccc3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -122,18 +122,16 @@ private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshot return new GetSnapshotsResponse(responses); }), repos.size()); - // run concurrently for all repos on GENERIC thread pool for (final RepositoryMetadata repo : repos) { final String repoName = repo.name(); - threadPool.generic().execute(ActionRunnable.wrap( - ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> { - if (e instanceof ElasticsearchException) { - groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e)); - } else { - groupedListener.onFailure(e); - } - }), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose, - ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos))))); + getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose, ActionListener.map( + ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> { + if (e instanceof ElasticsearchException) { + groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e)); + } else { + groupedListener.onFailure(e); + } + }), snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos))); } } @@ -154,9 +152,8 @@ private void getSingleRepoSnapshotInfo(@Nullable SnapshotsInProgress snapshotsIn repositoriesService.getRepositoryData(repo, repositoryDataListener); } - repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots, - ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)), - listener::onFailure); + repositoryDataListener.whenComplete(repositoryData -> loadSnapshotInfos(snapshotsInProgress, repo, snapshots, + ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData, listener), listener::onFailure); } /** @@ -178,9 +175,10 @@ private static List sortedCurrentSnapshots(@Nullable SnapshotsInPr } - private List loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots, - boolean ignoreUnavailable, boolean verbose, Map allSnapshotIds, - List currentSnapshots, @Nullable RepositoryData repositoryData) { + private void loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots, + boolean ignoreUnavailable, boolean verbose, Map allSnapshotIds, + List currentSnapshots, @Nullable RepositoryData repositoryData, + ActionListener> listener) { if (repositoryData != null) { for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); @@ -214,10 +212,11 @@ private List loadSnapshotInfos(@Nullable SnapshotsInProgress snaps } } - final List snapshotInfos; if (verbose) { - snapshotInfos = snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable); + threadPool.generic().execute(ActionRunnable.supply( + listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable))); } else { + final List snapshotInfos; if (repositoryData != null) { // want non-current snapshots as well, which are found in the repository data snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); @@ -226,9 +225,8 @@ private List loadSnapshotInfos(@Nullable SnapshotsInProgress snaps snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); CollectionUtil.timSort(snapshotInfos); } + listener.onResponse(snapshotInfos); } - - return snapshotInfos; } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fd54901a207dc..8826a39b9cb4d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -331,62 +331,52 @@ protected void doClose() { @Override public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { - threadPool.generic().execute(new AbstractRunnable() { - @Override - protected void doRun() { - final RepositoryMetadata repositoryMetadataStart = metadata; - getRepositoryData(ActionListener.wrap(repositoryData -> { - final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); - clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { + final RepositoryMetadata repositoryMetadataStart = metadata; + getRepositoryData(ActionListener.wrap(repositoryData -> { + final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { - private boolean executedTask = false; + private boolean executedTask = false; - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // Comparing the full metadata here on purpose instead of simply comparing the safe generation. - // If the safe generation has changed, then we have to reload repository data and start over. - // If the pending generation has changed we are in the midst of a write operation and might pick up the - // updated repository data and state on the retry. We don't want to wait for the write to finish though - // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state - // to change in any form. - if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { - executedTask = true; - return updateTask.execute(currentState); - } - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - if (executedTask) { - updateTask.onFailure(source, e); - } else { - onFailure.accept(e); - } - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + executedTask = true; + return updateTask.execute(currentState); + } + return currentState; + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (executedTask) { - updateTask.clusterStateProcessed(source, oldState, newState); - } else { - executeConsistentStateUpdate(createUpdateTask, source, onFailure); - } - } + @Override + public void onFailure(String source, Exception e) { + if (executedTask) { + updateTask.onFailure(source, e); + } else { + onFailure.accept(e); + } + } - @Override - public TimeValue timeout() { - return updateTask.timeout(); - } - }); - }, onFailure)); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (executedTask) { + updateTask.clusterStateProcessed(source, oldState, newState); + } else { + executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + } - @Override - public void onFailure(Exception e) { - onFailure.accept(e); - } - }); + @Override + public TimeValue timeout() { + return updateTask.timeout(); + } + }); + }, onFailure)); } // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates @@ -552,17 +542,23 @@ public void deleteSnapshots(Collection snapshotIds, long repositoryS if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); } else { - try { - final Map rootBlobs = blobContainer().listBlobs(); - final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); - // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never - // delete an index that was created by another master node after writing this index-N blob. - final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); - doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); - } catch (Exception ex) { - listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, ex)); - } + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final Map rootBlobs = blobContainer().listBlobs(); + final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); + // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never + // delete an index that was created by another master node after writing this index-N blob. + final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); + doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, + SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e)); + } + }); } } @@ -1168,6 +1164,22 @@ public void getRepositoryData(ActionListener listener) { listener.onFailure(corruptedStateException(null)); return; } + final Tuple cached = latestKnownRepositoryData.get(); + // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with + // the latest known repository generation + if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) { + try { + listener.onResponse(repositoryDataFromCachedEntry(cached)); + } catch (Exception e) { + listener.onFailure(e); + } + return; + } + // Slow path if we were not able to safely read the repository data from cache + threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + } + + private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. // Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same // generation repeatedly. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 14f02e87be1d8..2a3bc28fcecc5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,7 +27,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.ActionFilters; @@ -1166,17 +1165,16 @@ public static boolean useShardGenerations(Version repositoryMetaVersion) { */ private void deleteSnapshotsFromRepository(String repoName, Collection snapshotIds, @Nullable ActionListener listener, long repositoryStateId, Version minNodeVersion) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - Repository repository = repositoriesService.repository(repoName); - repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds, + Repository repository = repositoriesService.repository(repoName); + repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots( + snapshotIds, repositoryStateId, minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), ActionListener.wrap(v -> { - logger.info("snapshots {} deleted", snapshotIds); - removeSnapshotDeletionFromClusterState(snapshotIds, null, l); - }, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l) - )), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l))); - })); + logger.info("snapshots {} deleted", snapshotIds); + removeSnapshotDeletionFromClusterState(snapshotIds, null, listener); + }, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener) + )), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener))); } /** diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index a95fd25ec5f5e..ac989c7a568fb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -31,7 +31,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.io.IOException; @@ -94,12 +93,7 @@ protected void disableRepoConsistencyCheck(String reason) { } protected RepositoryData getRepositoryData(Repository repository) { - ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); - final PlainActionFuture repositoryData = PlainActionFuture.newFuture(); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - repository.getRepositoryData(repositoryData); - }); - return repositoryData.actionGet(); + return PlainActionFuture.get(repository::getRepositoryData); } public static long getFailureCount(String repository) {