Skip to content

Commit

Permalink
Ensure Node Shutdown Waits for Running Restores to Complete (elastic#…
Browse files Browse the repository at this point in the history
…76070) (elastic#76095)

We must wait for ongoing restores to complete before shutting down the repositories
service. Otherwise we may leak file descriptors because tasks for releasing the store
are submitted to the `SNAPSHOT` or some searchable snapshot pools that quietly accept
but never reject/fail tasks after shutdown.

same as elastic#46178 where we had the same bug in recoveries

closes elastic#75686
  • Loading branch information
original-brownbear authored Aug 4, 2021
1 parent 5afcf92 commit e484e67
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public void cloneShardSnapshot(
in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
}

@Override
public void awaitIdle() {
in.awaitIdle();
}

@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,5 +771,8 @@ protected void doClose() throws IOException {
repos.addAll(internalRepositories.values());
repos.addAll(repositories.values());
IOUtils.close(repos);
for (Repository repo : repos) {
repo.awaitIdle();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,15 @@ default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata)
return userMetadata;
}

/**
* Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed
* by a call to stop {@link #close()}.
* Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface
* as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the
* cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked.
*/
void awaitIdle();

static boolean assertSnapshotMetaThread() {
final String threadName = Thread.currentThread().getName();
assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -428,6 +430,30 @@ protected void doClose() {
}
}

// listeners to invoke when a restore completes and there are no more restores running
@Nullable
private List<ActionListener<Void>> emptyListeners;

// Set of shard ids that this repository is currently restoring
private final Set<ShardId> ongoingRestores = new HashSet<>();

@Override
public void awaitIdle() {
assert lifecycle.stoppedOrClosed();
final PlainActionFuture<Void> future;
synchronized (ongoingRestores) {
if (ongoingRestores.isEmpty()) {
return;
}
future = new PlainActionFuture<>();
if (emptyListeners == null) {
emptyListeners = new ArrayList<>();
}
emptyListeners.add(future);
}
FutureUtils.get(future);
}

@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
Expand Down Expand Up @@ -2907,7 +2933,30 @@ public void restoreShard(
);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
synchronized (ongoingRestores) {
if (store.isClosing()) {
restoreListener.onFailure(new AlreadyClosedException("store is closing"));
return;
}
if (lifecycle.started() == false) {
restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed"));
return;
}
final boolean added = ongoingRestores.add(shardId);
assert added : "add restore for [" + shardId + "] that already has an existing restore";
}
executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> {
final List<ActionListener<Void>> onEmptyListeners;
synchronized (ongoingRestores) {
if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) {
onEmptyListeners = emptyListeners;
emptyListeners = null;
} else {
return;
}
}
ActionListener.onResponse(onEmptyListeners, null);
}), l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
Expand Down Expand Up @@ -3013,6 +3062,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException {
if (store.isClosing()) {
throw new AlreadyClosedException("store is closing");
}
if (lifecycle.started() == false) {
throw new AlreadyClosedException("repository [" + metadata.name() + "] closed");
}
}

}.restore(snapshotFiles, store, l);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ public void cloneShardSnapshot(

}

@Override
public void awaitIdle() {}

@Override
public Lifecycle.State lifecycleState() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
public void updateState(final ClusterState state) {
}

@Override
public void awaitIdle() {
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void awaitIdle() {
}

private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
Client followerClient, Index followerIndex) {
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();
Expand Down

0 comments on commit e484e67

Please sign in to comment.