Skip to content

Commit

Permalink
Ensure MockRepository is Unblocked on Node Close (#62711) (#62748)
Browse files Browse the repository at this point in the history
`RepositoriesService#doClose` was never called which lead to
mock repositories not unblocking until the `ThreadPool` interrupts
all threads. Thus stopping a node that is blocked on a mock repository operation wastes `10s`
in each test that does it (which is quite a few as it turns out).
  • Loading branch information
original-brownbear authored Sep 22, 2020
1 parent 4bdbc39 commit aa0dc56
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
public void testMasterFailoverDuringCleanup() throws Exception {
startBlockedCleanup("test-repo");

final int nodeCount = internalCluster().numDataAndMasterNodes();
logger.info("--> stopping master node");
internalCluster().stopCurrentMasterNode();

ensureStableCluster(nodeCount - 1);

logger.info("--> wait for cleanup to finish and disappear from cluster state");
assertBusy(() -> {
RepositoryCleanupInProgress cleanupInProgress =
Expand Down Expand Up @@ -102,6 +105,8 @@ private String startBlockedCleanup(String repoName) throws Exception {

logger.info("--> waiting for block to kick in on " + masterNode);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
return masterNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -1283,10 +1282,6 @@ private ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName,
.setPartial(partial).execute();
}

private void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}

// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
toClose.add(injector.getInstance(SnapshotsService.class));
toClose.add(injector.getInstance(SnapshotShardsService.class));
toClose.add(injector.getInstance(RepositoriesService.class));
toClose.add(() -> stopWatch.stop().start("client"));
Releasables.close(injector.getInstance(Client.class));
toClose.add(() -> stopWatch.stop().start("indices_cluster"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
}

protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}

protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,13 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
}
}

private void blockExecutionAndMaybeWait(final String blobName) {
private void blockExecutionAndMaybeWait(final String blobName) throws IOException {
logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path());
if (blockExecution() && waitAfterUnblock > 0) {
final boolean wasBlocked = blockExecution();
if (wasBlocked && lifecycle.stoppedOrClosed()) {
throw new IOException("already closed");
}
if (wasBlocked && waitAfterUnblock > 0) {
try {
// Delay operation after unblocking
// So, we can start node shutdown while this operation is still running.
Expand Down

0 comments on commit aa0dc56

Please sign in to comment.