diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 624ed577798ea..fc73cb7c0fb1d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -181,6 +181,11 @@ public void cloneShardSnapshot( in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener); } + @Override + public void awaitIdle() { + in.awaitIdle(); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 9cf39dac1876d..da874835513f0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -771,5 +771,8 @@ protected void doClose() throws IOException { repos.addAll(internalRepositories.values()); repos.addAll(repositories.values()); IOUtils.close(repos); + for (Repository repo : repos) { + repo.awaitIdle(); + } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 4870377fe1d2f..2ae2ee7ed5e95 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -321,6 +321,15 @@ default Map adaptUserMetadata(Map userMetadata) return userMetadata; } + /** + * Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed + * by a call to stop {@link #close()}. + * Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface + * as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the + * cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked. + */ + void awaitIdle(); + static boolean assertSnapshotMetaThread() { final String threadName = Thread.currentThread().getName(); assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-") diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 6290cbffd4c94..472fe3cbd5c4e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -66,6 +67,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -423,6 +425,30 @@ protected void doClose() { } } + // listeners to invoke when a restore completes and there are no more restores running + @Nullable + private List> emptyListeners; + + // Set of shard ids that this repository is currently restoring + private final Set ongoingRestores = new HashSet<>(); + + @Override + public void awaitIdle() { + assert lifecycle.stoppedOrClosed(); + final PlainActionFuture future; + synchronized (ongoingRestores) { + if (ongoingRestores.isEmpty()) { + return; + } + future = new PlainActionFuture<>(); + if (emptyListeners == null) { + emptyListeners = new ArrayList<>(); + } + emptyListeners.add(future); + } + FutureUtils.get(future); + } + @Override public void executeConsistentStateUpdate( Function createUpdateTask, @@ -2885,7 +2911,30 @@ public void restoreShard( ); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final BlobContainer container = shardContainer(indexId, snapshotShardId); - executor.execute(ActionRunnable.wrap(restoreListener, l -> { + synchronized (ongoingRestores) { + if (store.isClosing()) { + restoreListener.onFailure(new AlreadyClosedException("store is closing")); + return; + } + if (lifecycle.started() == false) { + restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed")); + return; + } + final boolean added = ongoingRestores.add(shardId); + assert added : "add restore for [" + shardId + "] that already has an existing restore"; + } + executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> { + final List> onEmptyListeners; + synchronized (ongoingRestores) { + if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) { + onEmptyListeners = emptyListeners; + emptyListeners = null; + } else { + return; + } + } + ActionListener.onResponse(onEmptyListeners, null); + }), l -> { final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @@ -2991,6 +3040,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException { if (store.isClosing()) { throw new AlreadyClosedException("store is closing"); } + if (lifecycle.started() == false) { + throw new AlreadyClosedException("repository [" + metadata.name() + "] closed"); + } } }.restore(snapshotFiles, store, l); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 35915d6c0b2ca..e42554d4266a4 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -317,6 +317,9 @@ public void cloneShardSnapshot( } + @Override + public void awaitIdle() {} + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 1d87e3456c057..da8e8dde753fd 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -148,6 +148,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) { public void updateState(final ClusterState state) { } + @Override + public void awaitIdle() { + } + @Override public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 02537e1f81b85..c117e6738f88a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -494,6 +494,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + @Override + public void awaitIdle() { + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>();