Skip to content

Commit

Permalink
Safely Close Repositories on Node Shutdown (elastic#48020)
Browse files Browse the repository at this point in the history
We were not closing repositories on Node shutdown.
In production, this has little effect but in tests
shutting down a node using `MockRepository` and is
currently stuck in a simulated blocked-IO situation
will only unblock when the node's threadpool is
interrupted. This might in some edge cases (many
snapshot threads and some CI slowness) result
in the execution taking longer than 5s to release
all the shard stores and thus we fail the assertion
about unreleased shard stores in the internal test cluster.

Regardless of tests, I think we should close repositories
and release resources associated with them when closing
a node and not just when removing a repository from the CS
with running nodes as this behavior is really unexpected.

Fixes elastic#47689
  • Loading branch information
original-brownbear authored Oct 16, 2019
1 parent 37193c3 commit 27c24e2
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 2 deletions.
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();

Expand Down Expand Up @@ -773,6 +774,7 @@ private Node stop() {

injector.getInstance(SnapshotsService.class).stop();
injector.getInstance(SnapshotShardsService.class).stop();
injector.getInstance(RepositoriesService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// close discovery early to not react to pings anymore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -54,7 +58,7 @@
/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
*/
public class RepositoriesService implements ClusterStateApplier {
public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(RepositoriesService.class);

Expand Down Expand Up @@ -95,6 +99,8 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra
* @param listener register repository listener
*/
public void registerRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";

final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name(), request.type(), request.settings());
validate(request.name());

Expand Down Expand Up @@ -435,4 +441,23 @@ private void ensureRepositoryNotInUse(ClusterState clusterState, String reposito
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
}
}

@Override
protected void doStart() {

}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {
clusterService.removeApplier(this);
final Collection<Repository> repos = new ArrayList<>();
repos.addAll(internalRepositories.values());
repos.addAll(repositories.values());
IOUtils.close(repos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void setUp() throws Exception {
Collections.emptySet());
repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class),
transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool);
repositoriesService.start();
}

public void testRegisterInternalRepository() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,7 @@ public void start(ClusterState initialState) {
transportService.acceptIncomingRequests();
snapshotsService.start();
snapshotShardsService.start();
repositoriesService.start();
final CoordinationState.PersistedState persistedState =
new InMemoryPersistedState(initialState.term(), stateForNode(initialState, node));
coordinator = new Coordinator(node.getName(), clusterService.getSettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockRepository.Plugin.class, LocalStateCompositeXPackPlugin.class, IndexLifecycle.class);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47689")
public void testSnapshotInProgress() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
Expand Down

0 comments on commit 27c24e2

Please sign in to comment.