From fae5448a02817806b5c561390c0693257238ab18 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Dec 2020 09:49:32 +0100 Subject: [PATCH 01/10] Introduce IndexStorePlugin.IndexFoldersDeletionListener --- .../elasticsearch/env/NodeEnvironment.java | 33 ++++++--- .../elasticsearch/indices/IndicesService.java | 21 ++++-- ...CompositeIndexFoldersDeletionListener.java | 70 +++++++++++++++++++ .../java/org/elasticsearch/node/Node.java | 9 ++- .../plugins/IndexStorePlugin.java | 16 +++++ 5 files changed, 134 insertions(+), 15 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 348de81ade493..5e222afcc6889 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -87,6 +87,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -553,11 +554,15 @@ private static String toString(Collection items) { * @param shardId the id of the shard to delete to delete * @throws IOException if an IOException occurs */ - public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException { + public void deleteShardDirectorySafe( + ShardId shardId, + IndexSettings indexSettings, + Consumer> listener + ) throws IOException, ShardLockObtainFailedException { final Path[] paths = availableShardPaths(shardId); logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths); try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) { - deleteShardDirectoryUnderLock(lock, indexSettings); + deleteShardDirectoryUnderLock(lock, indexSettings, listener); } } @@ -602,18 +607,24 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh * @throws IOException if an IOException occurs * @throws ElasticsearchException if the write.lock is not acquirable */ - public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException { + public void deleteShardDirectoryUnderLock( + ShardLock lock, + IndexSettings indexSettings, + Consumer> listener + ) throws IOException { final ShardId shardId = lock.getShardId(); assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; final Path[] paths = availableShardPaths(shardId); logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths); acquireFSLockForPaths(indexSettings, paths); + listener.accept(List.of(paths)); IOUtils.rm(paths); if (indexSettings.hasCustomDataPath()) { Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId); logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation); acquireFSLockForPaths(indexSettings, customLocation); logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); + listener.accept(List.of(customLocation)); IOUtils.rm(customLocation); } logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths); @@ -665,11 +676,15 @@ private boolean isShardLocked(ShardId id) { * @param indexSettings settings for the index being deleted * @throws IOException if any of the shards data directories can't be locked or deleted */ - public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings) - throws IOException, ShardLockObtainFailedException { + public void deleteIndexDirectorySafe( + Index index, + long lockTimeoutMS, + IndexSettings indexSettings, + Consumer> listener + ) throws IOException, ShardLockObtainFailedException { final List locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS); try { - deleteIndexDirectoryUnderLock(index, indexSettings); + deleteIndexDirectoryUnderLock(index, indexSettings, listener); } finally { IOUtils.closeWhileHandlingException(locks); } @@ -677,18 +692,20 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti /** * Deletes an indexes data directory recursively. - * Note: this method assumes that the shard lock is acquired + * Note: this method assumes that the shard lock is aquired * * @param index the index to delete * @param indexSettings settings for the index being deleted */ - public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException { + public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings, Consumer> listener) throws IOException { final Path[] indexPaths = indexPaths(index); logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); + listener.accept(List.of(indexPaths)); IOUtils.rm(indexPaths); if (indexSettings.hasCustomDataPath()) { Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID()); logger.trace("deleting custom index {} directory [{}]", index, customLocation); + listener.accept(List.of(customLocation)); IOUtils.rm(customLocation); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index d438b2a96959b..174b00e65f5f7 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -124,6 +124,7 @@ import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.store.CompositeIndexFoldersDeletionListener; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.PluginsService; @@ -223,6 +224,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Collection>> engineFactoryProviders; private final Map directoryFactories; private final Map recoveryStateFactories; + private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListeners; final AbstractRefCounted indicesRefCount; // pkg-private for testing private final CountDownLatch closeLatch = new CountDownLatch(1); private volatile boolean idFieldDataEnabled; @@ -251,7 +253,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService, Collection>> engineFactoryProviders, Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, - Map recoveryStateFactories) { + Map recoveryStateFactories, + List indexFoldersDeletionListeners) { this.settings = settings; this.threadPool = threadPool; this.pluginsService = pluginsService; @@ -298,6 +301,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.directoryFactories = directoryFactories; this.recoveryStateFactories = recoveryStateFactories; + this.indexFoldersDeletionListeners = new CompositeIndexFoldersDeletionListener(indexFoldersDeletionListeners); // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -919,7 +923,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index logger.debug("{} deleting index store reason [{}]", index, reason); if (predicate.apply(index, indexSettings)) { // its safe to delete all index metadata and shard data - nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); + nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings, + paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths)); } success = true; } catch (ShardLockObtainFailedException ex) { @@ -948,7 +953,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException { ShardId shardId = lock.getShardId(); logger.trace("{} deleting shard reason [{}]", shardId, reason); - nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings); + nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings, + paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths)); } /** @@ -973,7 +979,8 @@ public void deleteShardStore(String reason, ShardId shardId, ClusterState cluste if (shardDeletionCheckResult != ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE) { throw new IllegalStateException("Can't delete shard " + shardId + " (cause: " + shardDeletionCheckResult + ")"); } - nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); + nodeEnv.deleteShardDirectorySafe(shardId, indexSettings, + paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths)); logger.debug("{} deleted shard reason [{}]", shardId, reason); if (canDeleteIndexContents(shardId.getIndex(), indexSettings)) { @@ -1211,14 +1218,16 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time assert delete.shardId == -1; logger.debug("{} deleting index store reason [{}]", index, "pending delete"); try { - nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings); + nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings, + paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths)); iterator.remove(); } catch (IOException ex) { logger.debug(() -> new ParameterizedMessage("{} retry pending delete", index), ex); } } else { assert delete.shardId != -1; - ShardLock shardLock = locks.get(new ShardId(delete.index, delete.shardId)); + final ShardId shardId = new ShardId(delete.index, delete.shardId); + final ShardLock shardLock = locks.get(shardId); if (shardLock != null) { try { deleteShardStore("pending delete", shardLock, delete.settings); diff --git a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java new file mode 100644 index 0000000000000..5f01448f20d3f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.IndexStorePlugin; + +import java.nio.file.Path; +import java.util.List; + +public class CompositeIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener { + + private static final Logger logger = LogManager.getLogger(CompositeIndexFoldersDeletionListener.class); + private final List listeners; + + public CompositeIndexFoldersDeletionListener(List listeners) { + for (IndexStorePlugin.IndexFoldersDeletionListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listeners must be non-null"); + } + } + this.listeners = List.copyOf(listeners); + } + + @Override + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths) { + for (IndexStorePlugin.IndexFoldersDeletionListener listener : listeners) { + try { + listener.beforeIndexFoldersDeleted(index, indexSettings, indexPaths); + } catch (Exception e) { + logger.warn(() -> + new ParameterizedMessage("[{}] failed to invoke listener [{}] for index folders deletion", index, listener), e); + } + } + } + + @Override + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { + for (IndexStorePlugin.IndexFoldersDeletionListener listener : listeners) { + try { + listener.beforeShardFoldersDeleted(shardId, indexSettings, shardPaths); + } catch (Exception e) { + logger.warn(() -> + new ParameterizedMessage("[{}] failed to invoke listener [{}] for shard folders deletion", shardId, listener), e); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index ee8f0bcb1b92e..827a7785b1270 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -491,6 +491,13 @@ protected Node(final Environment initialEnvironment, .flatMap(m -> m.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + final List indexFoldersDeletionListeners = + pluginsService.filterPlugins(IndexStorePlugin.class) + .stream() + .map(IndexStorePlugin::getIndexFoldersDeletionListeners) + .flatMap(List::stream) + .collect(Collectors.toList()); + final Map> systemIndexDescriptorMap = pluginsService .filterPlugins(SystemIndexPlugin.class) .stream() @@ -512,7 +519,7 @@ protected Node(final Environment initialEnvironment, clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry, threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService, clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories, - searchModule.getValuesSourceRegistry(), recoveryStateFactories); + searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index ae12112eca9fd..7277a353f88a2 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -23,12 +23,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; +import java.nio.file.Path; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -82,4 +86,16 @@ interface RecoveryStateFactory { default Map getRecoveryStateFactories() { return Collections.emptyMap(); } + + /** + * {@link IndexFoldersDeletionListener} are invoked before the folders of a shard or an index are deleted from disk. + */ + interface IndexFoldersDeletionListener { + void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths); + void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths); + } + + default List getIndexFoldersDeletionListeners() { + return Collections.emptyList(); + } } From fe0fbbdbc57995a6ea44727be570a59e23e35b0d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Dec 2020 17:16:18 +0100 Subject: [PATCH 02/10] add tests --- .../index/shard/IndexShardIT.java | 17 +- .../IndexFoldersDeletionListenerIT.java | 248 ++++++++++++++++++ .../plugins/IndexStorePlugin.java | 29 +- .../env/NodeEnvironmentTests.java | 43 ++- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../core/LocalStateCompositeXPackPlugin.java | 7 + 6 files changed, 323 insertions(+), 24 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 29804df62fa9e..94cfd6ccf8447 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -150,16 +150,17 @@ public void testLockTryingToDelete() throws Exception { // Test without the regular shard lock to assume we can acquire it // (worst case, meaning that the shard lock could be acquired and // we're green to delete the shard's directory) - ShardLock sLock = new DummyShardLock(new ShardId(index, 0)); - try { - env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); - fail("should not have been able to delete the directory"); - } catch (LockObtainFailedException e) { - assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); - } + final ShardLock sLock = new DummyShardLock(new ShardId(index, 0)); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); + + final AtomicBoolean listener = new AtomicBoolean(); + final LockObtainFailedException exception = expectThrows(LockObtainFailedException.class, () -> + env.deleteShardDirectoryUnderLock(sLock, indexSettings, indexPaths -> listener.set(true))); + assertThat(exception.getMessage(), exception.getMessage(), containsString("unable to acquire write.lock")); + assertFalse("Listener should not have been called", listener.get()); } - public void testDurableFlagHasEffect() throws Exception { + public void testDurableFlagHasEffect() { createIndex("test"); ensureGreen(); client().prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).get(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java new file mode 100644 index 0000000000000..c25407fef6020 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class IndexFoldersDeletionListenerIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(IndexFoldersDeletionListenerPlugin.class); + return plugins; + } + + public void testListenersInvokedWhenIndexIsDeleted() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final NumShards numShards = getNumShards(indexName); + ensureClusterSizeConsistency(); // wait for a stable cluster + ensureGreen(indexName); // wait for no relocation + + final ClusterState clusterState = clusterService().state(); + final Index index = clusterState.metadata().index(indexName).getIndex(); + final Map> shardsByNodes = shardRoutingsByNodes(clusterState, index); + assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards)); + + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); + assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); + } + + assertAcked(client().admin().indices().prepareDelete(indexName)); + ensureClusterStateConsistency(); + assertPendingDeletesProcessed(); + + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, + plugin.deletedIndices.contains(index)); + + final List deletedShards = plugin.deletedShards.get(index); + assertThat(deletedShards, notNullValue()); + assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, + deletedShards.isEmpty()); + + for (ShardRouting shardRouting : shardsByNode.getValue()) { + final ShardId shardId = shardRouting.shardId(); + assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, + deletedShards.contains(shardId)); + } + } + } + + public void testListenersInvokedWhenIndexIsRelocated() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(4); + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(4, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)) + .build()); + + final NumShards numShards = getNumShards(indexName); + ensureGreen(indexName); + + final ClusterState clusterState = clusterService().state(); + final Index index = clusterState.metadata().index(indexName).getIndex(); + final Map> shardsByNodes = shardRoutingsByNodes(clusterState, index); + assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards)); + + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); + assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); + } + + final List excludedNodes = randomSubsetOf(2, shardsByNodes.keySet()); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", String.join(",", excludedNodes)) + .build())); + ensureGreen(indexName); + ensureClusterStateConsistency(); + assertPendingDeletesProcessed(); + + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + + if (excludedNodes.contains(nodeName)) { + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, + plugin.deletedIndices.contains(index)); + + final List deletedShards = plugin.deletedShards.get(index); + assertThat(deletedShards, notNullValue()); + assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, + deletedShards.isEmpty()); + + for (ShardRouting shardRouting : shardsByNode.getValue()) { + final ShardId shardId = shardRouting.shardId(); + assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, + deletedShards.contains(shardId)); + } + } else { + assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); + assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); + } + } + } + + public void testListenersInvokedWhenIndexIsDangling() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(4); + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(4, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)) + .build()); + + final NumShards numShards = getNumShards(indexName); + ensureGreen(indexName); + + final ClusterState clusterState = clusterService().state(); + final Index index = clusterState.metadata().index(indexName).getIndex(); + final Map> shardsByNodes = shardRoutingsByNodes(clusterState, index); + assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards)); + + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); + assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); + } + + final String stoppedNode = randomFrom(shardsByNodes.keySet()); + final Settings stoppedNodeDataPathSettings = internalCluster().dataPathSettings(stoppedNode); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(stoppedNode)); + + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final String restartedNode = internalCluster().startNode(stoppedNodeDataPathSettings); + ensureClusterSizeConsistency(); + ensureClusterStateConsistency(); + assertPendingDeletesProcessed(); + + final IndexFoldersDeletionListenerPlugin plugin = plugin(restartedNode); + + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + restartedNode, + plugin.deletedIndices.contains(index)); + } + + private Map> shardRoutingsByNodes(ClusterState clusterState, Index index) { + final Map> map = new HashMap<>(); + for (ShardRouting shardRouting : clusterState.routingTable().index(index).shardsWithState(ShardRoutingState.STARTED)) { + final String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); + map.computeIfAbsent(nodeName, name -> new ArrayList<>()).add(shardRouting); + } + return map; + } + + public static class IndexFoldersDeletionListenerPlugin extends Plugin implements IndexStorePlugin { + + final Set deletedIndices = ConcurrentCollections.newConcurrentSet(); + final Map> deletedShards = ConcurrentCollections.newConcurrentMap(); + + @Override + public List getIndexFoldersDeletionListeners() { + return List.of(new IndexFoldersDeletionListener() { + @Override + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths) { + deletedIndices.add(index); + } + + @Override + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { + deletedShards.computeIfAbsent(shardId.getIndex(), i -> new ArrayList<>()).add(shardId); + } + }); + } + + @Override + public Map getDirectoryFactories() { + return Collections.emptyMap(); + } + } + + private static IndexFoldersDeletionListenerPlugin plugin(String nodeId) { + final PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, nodeId); + final List plugins = pluginsService.filterPlugins(IndexFoldersDeletionListenerPlugin.class); + assertThat(plugins, hasSize(1)); + return plugins.get(0); + } + + private static void assertPendingDeletesProcessed() throws Exception { + assertBusy(() -> { + final Iterable services = internalCluster().getDataNodeInstances(IndicesService.class); + services.forEach(indicesService -> assertFalse(indicesService.hasUncompletedPendingDeletes())); + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index 7277a353f88a2..d52c84d508bc7 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -91,10 +91,35 @@ default Map getRecoveryStateFactories() { * {@link IndexFoldersDeletionListener} are invoked before the folders of a shard or an index are deleted from disk. */ interface IndexFoldersDeletionListener { - void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths); - void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths); + /** + * Invoked before the folders of an index are deleted from disk. The list of folders contains {@link Path}s that may or may not + * exist on disk. Shard locks are expected to be acquired at the time this method is invoked. + * + * @param index the {@link Index} of the index whose folders are going to be deleted + * @param indexSettings settings for the index whose folders are going to be deleted + * @param indexPaths the paths of the folders that are going to be deleted + */ + default void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths) { + } + + /** + * Invoked before the folders of a shard are deleted from disk. The list of folders contains {@link Path}s that may or may not + * exist on disk. Shard locks are expected to be acquired at the time this method is invoked. + * + * @param shardId the {@link ShardId} of the shard whose folders are going to be deleted + * @param indexSettings index settings of the shard whose folders are going to be deleted + * @param shardPaths the paths of the folders that are going to be deleted + */ + default void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { + } } + /** + * The {@link IndexFoldersDeletionListener} listeners for this plugin. When the folders of an index or a shard are deleted from disk, + * these listeners are invoked before the deletion happens in order to allow plugin to clean up any resources if needed. + * + * @return a list of {@link IndexFoldersDeletionListener} listeners + */ default List getIndexFoldersDeletionListeners() { return Collections.emptyList(); } diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 223ad11739b51..ad7e47a52c6fb 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.PathUtils; @@ -211,7 +212,7 @@ public void testResolveIndexFolders() throws Exception { public void testDeleteSafe() throws Exception { final NodeEnvironment env = newNodeEnvironment(); final Index index = new Index("foo", "fooUUID"); - ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1"); + final ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1"); assertEquals(new ShardId(index, 0), fooLock.getShardId()); for (Path path : env.indexPaths(index)) { @@ -219,11 +220,11 @@ public void testDeleteSafe() throws Exception { Files.createDirectories(path.resolve("1")); } - try { - env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); - fail("shard is locked"); - } catch (ShardLockObtainFailedException ex) { - // expected + { + SetOnce> listener = new SetOnce<>(); + ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, + () -> env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings, listener::set)); + assertNull(listener.get()); } for (Path path : env.indexPaths(index)) { @@ -231,19 +232,28 @@ public void testDeleteSafe() throws Exception { assertTrue(Files.exists(path.resolve("1"))); } - env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings); + { + SetOnce> listener = new SetOnce<>(); + env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set); + List deletedPaths = listener.get(); + assertNotNull(deletedPaths); + for (Path path : env.indexPaths(index)) { + assertTrue(deletedPaths.contains(path.resolve("1"))); + } + } for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path.resolve("0"))); assertFalse(Files.exists(path.resolve("1"))); } - try { - env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings); - fail("shard is locked"); - } catch (ShardLockObtainFailedException ex) { - // expected + { + SetOnce> listener = new SetOnce<>(); + ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, + () -> env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings, listener::set)); + assertNull(listener.get()); } + fooLock.close(); for (Path path : env.indexPaths(index)) { @@ -282,7 +292,14 @@ protected void doRun() throws Exception { start.countDown(); blockLatch.await(); - env.deleteIndexDirectorySafe(index, 5000, idxSettings); + final SetOnce> listener = new SetOnce<>(); + env.deleteIndexDirectorySafe(index, 5000, idxSettings, listener::set); + + final List deletedPaths = listener.get(); + assertNotNull(deletedPaths); + for (Path path : env.indexPaths(index)) { + assertTrue(deletedPaths.contains(path)); + } assertNull(threadException.get()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 555245343bd12..7f639c1f0ff13 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1496,7 +1496,8 @@ protected NamedWriteableRegistry writeableRegistry() { Collections.emptyList(), emptyMap(), null, - emptyMap() + emptyMap(), + List.of() ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 4030222d3bab3..b49f6d1761afa 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -528,6 +528,13 @@ public Map getRecoveryStateFactories() { return factories; } + @Override + public List getIndexFoldersDeletionListeners() { + final List listeners = new ArrayList<>(); + filterPlugins(IndexStorePlugin.class).forEach(p -> listeners.addAll(p.getIndexFoldersDeletionListeners())); + return Collections.unmodifiableList(listeners); + } + private List filterPlugins(Class type) { return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p)) .collect(Collectors.toList()); From f32967b35ceaac7f946cae378c7dd0c1da2f2a34 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 8 Dec 2020 09:20:42 +0100 Subject: [PATCH 03/10] typo --- server/src/main/java/org/elasticsearch/env/NodeEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 5e222afcc6889..5e202616f01a5 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -692,7 +692,7 @@ public void deleteIndexDirectorySafe( /** * Deletes an indexes data directory recursively. - * Note: this method assumes that the shard lock is aquired + * Note: this method assumes that the shard lock is acquired * * @param index the index to delete * @param indexSettings settings for the index being deleted From bd7aca56e003bdb18ac6c4a8fae65ce60b88e4d0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 09:17:09 +0100 Subject: [PATCH 04/10] assert false --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 94cfd6ccf8447..a91fdc07e2d44 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -153,11 +153,11 @@ public void testLockTryingToDelete() throws Exception { final ShardLock sLock = new DummyShardLock(new ShardId(index, 0)); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); - final AtomicBoolean listener = new AtomicBoolean(); final LockObtainFailedException exception = expectThrows(LockObtainFailedException.class, () -> - env.deleteShardDirectoryUnderLock(sLock, indexSettings, indexPaths -> listener.set(true))); + env.deleteShardDirectoryUnderLock(sLock, indexSettings, indexPaths -> { + assert false : "should not be called " + indexPaths; + })); assertThat(exception.getMessage(), exception.getMessage(), containsString("unable to acquire write.lock")); - assertFalse("Listener should not have been called", listener.get()); } public void testDurableFlagHasEffect() { From 5925e5afa90c278be9bb2388d1ccfe2670609747 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 09:18:50 +0100 Subject: [PATCH 05/10] no default --- .../java/org/elasticsearch/plugins/IndexStorePlugin.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index d52c84d508bc7..f996b0ef85844 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -99,8 +99,7 @@ interface IndexFoldersDeletionListener { * @param indexSettings settings for the index whose folders are going to be deleted * @param indexPaths the paths of the folders that are going to be deleted */ - default void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths) { - } + void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths); /** * Invoked before the folders of a shard are deleted from disk. The list of folders contains {@link Path}s that may or may not @@ -110,8 +109,7 @@ default void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, * @param indexSettings index settings of the shard whose folders are going to be deleted * @param shardPaths the paths of the folders that are going to be deleted */ - default void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { - } + void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths); } /** From 850d1a389665e4d644e6c730691e0f2debd26e41 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 09:55:26 +0100 Subject: [PATCH 06/10] Path[] --- .../IndexFoldersDeletionListenerIT.java | 4 +- .../elasticsearch/env/NodeEnvironment.java | 16 ++--- ...CompositeIndexFoldersDeletionListener.java | 4 +- .../plugins/IndexStorePlugin.java | 4 +- .../env/NodeEnvironmentTests.java | 69 +++++++++---------- 5 files changed, 46 insertions(+), 51 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java index c25407fef6020..90d4e5ff41333 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java @@ -215,12 +215,12 @@ public static class IndexFoldersDeletionListenerPlugin extends Plugin implements public List getIndexFoldersDeletionListeners() { return List.of(new IndexFoldersDeletionListener() { @Override - public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths) { + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { deletedIndices.add(index); } @Override - public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { deletedShards.computeIfAbsent(shardId.getIndex(), i -> new ArrayList<>()).add(shardId); } }); diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 5e202616f01a5..656ac31d08d8a 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -557,7 +557,7 @@ private static String toString(Collection items) { public void deleteShardDirectorySafe( ShardId shardId, IndexSettings indexSettings, - Consumer> listener + Consumer listener ) throws IOException, ShardLockObtainFailedException { final Path[] paths = availableShardPaths(shardId); logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths); @@ -610,21 +610,21 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh public void deleteShardDirectoryUnderLock( ShardLock lock, IndexSettings indexSettings, - Consumer> listener + Consumer listener ) throws IOException { final ShardId shardId = lock.getShardId(); assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; final Path[] paths = availableShardPaths(shardId); logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths); acquireFSLockForPaths(indexSettings, paths); - listener.accept(List.of(paths)); + listener.accept(paths); IOUtils.rm(paths); if (indexSettings.hasCustomDataPath()) { Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId); logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation); acquireFSLockForPaths(indexSettings, customLocation); logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); - listener.accept(List.of(customLocation)); + listener.accept(new Path[]{customLocation}); IOUtils.rm(customLocation); } logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths); @@ -680,7 +680,7 @@ public void deleteIndexDirectorySafe( Index index, long lockTimeoutMS, IndexSettings indexSettings, - Consumer> listener + Consumer listener ) throws IOException, ShardLockObtainFailedException { final List locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS); try { @@ -697,15 +697,15 @@ public void deleteIndexDirectorySafe( * @param index the index to delete * @param indexSettings settings for the index being deleted */ - public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings, Consumer> listener) throws IOException { + public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings, Consumer listener) throws IOException { final Path[] indexPaths = indexPaths(index); logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); - listener.accept(List.of(indexPaths)); + listener.accept(indexPaths); IOUtils.rm(indexPaths); if (indexSettings.hasCustomDataPath()) { Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID()); logger.trace("deleting custom index {} directory [{}]", index, customLocation); - listener.accept(List.of(customLocation)); + listener.accept(new Path[]{customLocation}); IOUtils.rm(customLocation); } } diff --git a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java index 5f01448f20d3f..9ccf88201610f 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java +++ b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java @@ -45,7 +45,7 @@ public CompositeIndexFoldersDeletionListener(List indexPaths) { + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { for (IndexStorePlugin.IndexFoldersDeletionListener listener : listeners) { try { listener.beforeIndexFoldersDeleted(index, indexSettings, indexPaths); @@ -57,7 +57,7 @@ public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, } @Override - public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths) { + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { for (IndexStorePlugin.IndexFoldersDeletionListener listener : listeners) { try { listener.beforeShardFoldersDeleted(shardId, indexSettings, shardPaths); diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index f996b0ef85844..37341accede0a 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -99,7 +99,7 @@ interface IndexFoldersDeletionListener { * @param indexSettings settings for the index whose folders are going to be deleted * @param indexPaths the paths of the folders that are going to be deleted */ - void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, List indexPaths); + void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths); /** * Invoked before the folders of a shard are deleted from disk. The list of folders contains {@link Path}s that may or may not @@ -109,7 +109,7 @@ interface IndexFoldersDeletionListener { * @param indexSettings index settings of the shard whose folders are going to be deleted * @param shardPaths the paths of the folders that are going to be deleted */ - void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, List shardPaths); + void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths); } /** diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index ad7e47a52c6fb..0ebc985a469fc 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -68,7 +68,7 @@ public void testNodeLock() throws IOException { // Reuse the same location and attempt to lock again IllegalStateException ex = expectThrows(IllegalStateException.class, () -> - new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))); + new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))); assertThat(ex.getMessage(), containsString("failed to obtain node lock")); // Close the environment that holds the lock and make sure we can get the lock after release @@ -91,12 +91,12 @@ public void testSegmentInfosTracing() { try { // False means don't hook up std out NodeEnvironment.applySegmentInfosTrace( - Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), false).build()); + Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), false).build()); assertNull(SegmentInfos.getInfoStream()); // But true means hook std out up statically NodeEnvironment.applySegmentInfosTrace( - Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), true).build()); + Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), true).build()); assertEquals(System.out, SegmentInfos.getInfoStream()); } finally { // Clean up after ourselves @@ -221,7 +221,7 @@ public void testDeleteSafe() throws Exception { } { - SetOnce> listener = new SetOnce<>(); + SetOnce listener = new SetOnce<>(); ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, () -> env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings, listener::set)); assertNull(listener.get()); @@ -233,12 +233,11 @@ public void testDeleteSafe() throws Exception { } { - SetOnce> listener = new SetOnce<>(); + SetOnce listener = new SetOnce<>(); env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set); - List deletedPaths = listener.get(); - assertNotNull(deletedPaths); - for (Path path : env.indexPaths(index)) { - assertTrue(deletedPaths.contains(path.resolve("1"))); + Path[] deletedPaths = listener.get(); + for (int i = 0; i < env.nodePaths().length; i++) { + assertThat(deletedPaths[i], equalTo(env.nodePaths()[i].resolve(index).resolve("1"))); } } @@ -248,7 +247,7 @@ public void testDeleteSafe() throws Exception { } { - SetOnce> listener = new SetOnce<>(); + SetOnce listener = new SetOnce<>(); ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, () -> env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings, listener::set)); assertNull(listener.get()); @@ -292,15 +291,9 @@ protected void doRun() throws Exception { start.countDown(); blockLatch.await(); - final SetOnce> listener = new SetOnce<>(); + final SetOnce listener = new SetOnce<>(); env.deleteIndexDirectorySafe(index, 5000, idxSettings, listener::set); - - final List deletedPaths = listener.get(); - assertNotNull(deletedPaths); - for (Path path : env.indexPaths(index)) { - assertTrue(deletedPaths.contains(path)); - } - + assertArrayEquals(env.indexPaths(index), listener.get()); assertNull(threadException.get()); for (Path path : env.indexPaths(index)) { @@ -343,7 +336,7 @@ public void run() { int shard = randomIntBetween(0, counts.length - 1); try { try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), "1", - scaledRandomIntBetween(0, 10))) { + scaledRandomIntBetween(0, 10))) { counts[shard].value++; countsAtomic[shard].incrementAndGet(); assertEquals(flipFlop[shard].incrementAndGet(), 1); @@ -383,22 +376,22 @@ public void testCustomDataPaths() throws Exception { equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath())); assertThat("shard paths with a custom data_path should contain only regular paths", - env.availableShardPaths(sid), - equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID() + "/0"))); + env.availableShardPaths(sid), + equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID() + "/0"))); assertThat("index paths uses the regular template", - env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID()))); + env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID()))); assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid))); assertThat(env.resolveCustomLocation("/tmp/foo", sid).toAbsolutePath(), equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath())); assertThat("shard paths with a custom data_path should contain only regular paths", - env.availableShardPaths(sid), - equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID() + "/0"))); + env.availableShardPaths(sid), + equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID() + "/0"))); assertThat("index paths uses the regular template", - env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID()))); + env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID()))); env.close(); } @@ -421,7 +414,7 @@ public void testExistingTempFiles() throws IOException { env.close(); // check we clean up - for (String path: paths) { + for (String path : paths) { final Path nodePath = PathUtils.get(path); final Path tempFile = nodePath.resolve(NodeEnvironment.TEMP_FILE_NAME); assertFalse(tempFile + " should have been cleaned", Files.exists(tempFile)); @@ -510,7 +503,9 @@ private void verifyFailsOnMetadata(Settings settings, Path indexPath) { assertThat(ex.getMessage(), startsWith("node does not have the data and master roles but has index metadata")); } - /** Converts an array of Strings to an array of Paths, adding an additional child if specified */ + /** + * Converts an array of Strings to an array of Paths, adding an additional child if specified + */ private Path[] stringsToPaths(String[] strings, String additional) { Path[] locations = new Path[strings.length]; for (int i = 0; i < strings.length; i++) { @@ -542,25 +537,25 @@ public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException public Settings buildEnvSettings(Settings settings) { return Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) - .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) - .put(settings).build(); + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) + .put(settings).build(); } public NodeEnvironment newNodeEnvironment(String[] dataPaths, Settings settings) throws IOException { Settings build = Settings.builder() - .put(settings) - .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) - .putList(Environment.PATH_DATA_SETTING.getKey(), dataPaths).build(); + .put(settings) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) + .putList(Environment.PATH_DATA_SETTING.getKey(), dataPaths).build(); return new NodeEnvironment(build, TestEnvironment.newEnvironment(build)); } public NodeEnvironment newNodeEnvironment(String[] dataPaths, String sharedDataPath, Settings settings) throws IOException { Settings build = Settings.builder() - .put(settings) - .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) - .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), sharedDataPath) - .putList(Environment.PATH_DATA_SETTING.getKey(), dataPaths).build(); + .put(settings) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath().toString()) + .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), sharedDataPath) + .putList(Environment.PATH_DATA_SETTING.getKey(), dataPaths).build(); return new NodeEnvironment(build, TestEnvironment.newEnvironment(build)); } } From 81da61f7a20c1165b811fe4c7c5465acac3d574d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 10:29:58 +0100 Subject: [PATCH 07/10] deleteLeftoverShardDirectory plumbing --- .../java/org/elasticsearch/index/IndexModule.java | 5 +++-- .../java/org/elasticsearch/index/IndexService.java | 9 +++++++-- .../org/elasticsearch/index/shard/ShardPath.java | 13 ++++++++++--- .../org/elasticsearch/indices/IndicesService.java | 3 ++- .../org/elasticsearch/index/IndexModuleTests.java | 13 ++++++++++++- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 030ec8745d6a1..ad6fb2205e4ae 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -413,7 +413,8 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat IndicesFieldDataCache indicesFieldDataCache, NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, - ValuesSourceRegistry valuesSourceRegistry) throws IOException { + ValuesSourceRegistry valuesSourceRegistry, + IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get(); @@ -442,7 +443,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache, directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver, - valuesSourceRegistry, recoveryStateFactory); + valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener); success = true; return indexService; } finally { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index bec23def8b1c3..785c760b0e5c2 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -115,6 +115,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BitsetFilterCache bitsetFilterCache; private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; + private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener; private final IndexStorePlugin.DirectoryFactory directoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; @@ -178,7 +179,9 @@ public IndexService( BooleanSupplier allowExpensiveQueries, IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) { + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, + IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener + ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; this.indexSettings = indexSettings; @@ -219,6 +222,7 @@ public IndexService( } this.shardStoreDeleter = shardStoreDeleter; + this.indexFoldersDeletionListener = indexFoldersDeletionListener; this.bigArrays = bigArrays; this.threadPool = threadPool; this.scriptService = scriptService; @@ -414,7 +418,8 @@ public synchronized IndexShard createShard( } catch (IllegalStateException ex) { logger.warn("{} failed to load shard path, trying to remove leftover", shardId); try { - ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); + ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings, shardPaths -> + indexFoldersDeletionListener.beforeShardFoldersDeleted(shardId, this.indexSettings, shardPaths)); path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); } catch (Exception inner) { ex.addSuppressed(inner); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java index 383fe57943f89..6f311210b4fed 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public final class ShardPath { public static final String INDEX_FOLDER_NAME = "index"; @@ -172,8 +173,13 @@ public static ShardPath loadShardPath(Logger logger, ShardId shardId, String cus * This method tries to delete left-over shards where the index name has been reused but the UUID is different * to allow the new shard to be allocated. */ - public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment env, - ShardLock lock, IndexSettings indexSettings) throws IOException { + public static void deleteLeftoverShardDirectory( + final Logger logger, + final NodeEnvironment env, + final ShardLock lock, + final IndexSettings indexSettings, + final Consumer listener + ) throws IOException { final String indexUUID = indexSettings.getUUID(); final Path[] paths = env.availableShardPaths(lock.getShardId()); for (Path path : paths) { @@ -183,7 +189,8 @@ public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment e if (load.indexUUID.equals(indexUUID) == false && IndexMetadata.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) { logger.warn("{} deleting leftover shard on path: [{}] with a different index UUID", lock.getShardId(), path); assert Files.isDirectory(path) : path + " is not a directory"; - NodeEnvironment.acquireFSLockForPaths(indexSettings, paths); + NodeEnvironment.acquireFSLockForPaths(indexSettings, path); + listener.accept(new Path[]{path}); IOUtils.rm(path); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 174b00e65f5f7..afa211729c3ba 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -679,7 +679,8 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC indicesFieldDataCache, namedWriteableRegistry, this::isIdFieldDataEnabled, - valuesSourceRegistry + valuesSourceRegistry, + indexFoldersDeletionListeners ); } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 19b526b30a182..d18dd73cdd47a 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -95,6 +95,7 @@ import org.hamcrest.Matchers; import java.io.IOException; +import java.nio.file.Path; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -130,6 +131,16 @@ public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) { } }; + private IndexStorePlugin.IndexFoldersDeletionListener indexDeletionListener = new IndexStorePlugin.IndexFoldersDeletionListener() { + @Override + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { + } + + @Override + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { + } + }; + private final IndexFieldDataCache.Listener listener = new IndexFieldDataCache.Listener() {}; private MapperRegistry mapperRegistry; private ThreadPool threadPool; @@ -169,7 +180,7 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { return module.newIndexService(CREATE_INDEX, nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, null, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, null); + new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, null, indexDeletionListener); } public void testWrapperIsBound() throws IOException { From d6a6e2dfc48c43287137501afbf8dfc8502b7b18 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 10:36:47 +0100 Subject: [PATCH 08/10] no swallowing --- .../store/CompositeIndexFoldersDeletionListener.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java index 9ccf88201610f..ee433a90de372 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java +++ b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -50,8 +49,8 @@ public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, try { listener.beforeIndexFoldersDeleted(index, indexSettings, indexPaths); } catch (Exception e) { - logger.warn(() -> - new ParameterizedMessage("[{}] failed to invoke listener [{}] for index folders deletion", index, listener), e); + assert false : new AssertionError(e); + throw e; } } } @@ -62,8 +61,8 @@ public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettin try { listener.beforeShardFoldersDeleted(shardId, indexSettings, shardPaths); } catch (Exception e) { - logger.warn(() -> - new ParameterizedMessage("[{}] failed to invoke listener [{}] for shard folders deletion", shardId, listener), e); + assert false : new AssertionError(e); + throw e; } } } From 797cce2942661d4a95a0cf791eee0905b1dbd3e3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Dec 2020 13:17:46 +0100 Subject: [PATCH 09/10] Add test for left-over shards + assertBusy --- .../IndexFoldersDeletionListenerIT.java | 173 +++++++++++++----- 1 file changed, 128 insertions(+), 45 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java index 90d4e5ff41333..2829bd17a0c1d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java @@ -19,12 +19,15 @@ package org.elasticsearch.plugins; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -32,6 +35,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -41,7 +45,11 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import static org.elasticsearch.env.NodeEnvironment.INDICES_FOLDER; +import static org.elasticsearch.gateway.MetadataStateFormat.STATE_DIR_NAME; +import static org.elasticsearch.index.shard.ShardPath.INDEX_FOLDER_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -78,26 +86,26 @@ public void testListenersInvokedWhenIndexIsDeleted() throws Exception { } assertAcked(client().admin().indices().prepareDelete(indexName)); - ensureClusterStateConsistency(); - assertPendingDeletesProcessed(); - for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { - final String nodeName = shardsByNode.getKey(); - final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); - assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, - plugin.deletedIndices.contains(index)); + assertBusy(() -> { + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, + plugin.deletedIndices.contains(index)); - final List deletedShards = plugin.deletedShards.get(index); - assertThat(deletedShards, notNullValue()); - assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, - deletedShards.isEmpty()); + final List deletedShards = plugin.deletedShards.get(index); + assertThat(deletedShards, notNullValue()); + assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, + deletedShards.isEmpty()); - for (ShardRouting shardRouting : shardsByNode.getValue()) { - final ShardId shardId = shardRouting.shardId(); - assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, - deletedShards.contains(shardId)); + for (ShardRouting shardRouting : shardsByNode.getValue()) { + final ShardId shardId = shardRouting.shardId(); + assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, + deletedShards.contains(shardId)); + } } - } + }); } public void testListenersInvokedWhenIndexIsRelocated() throws Exception { @@ -129,32 +137,32 @@ public void testListenersInvokedWhenIndexIsRelocated() throws Exception { .put("index.routing.allocation.exclude._name", String.join(",", excludedNodes)) .build())); ensureGreen(indexName); - ensureClusterStateConsistency(); - assertPendingDeletesProcessed(); - - for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { - final String nodeName = shardsByNode.getKey(); - final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); - - if (excludedNodes.contains(nodeName)) { - assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, - plugin.deletedIndices.contains(index)); - final List deletedShards = plugin.deletedShards.get(index); - assertThat(deletedShards, notNullValue()); - assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, - deletedShards.isEmpty()); - - for (ShardRouting shardRouting : shardsByNode.getValue()) { - final ShardId shardId = shardRouting.shardId(); - assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, - deletedShards.contains(shardId)); + assertBusy(() -> { + for (Map.Entry> shardsByNode : shardsByNodes.entrySet()) { + final String nodeName = shardsByNode.getKey(); + final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName); + + if (excludedNodes.contains(nodeName)) { + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + nodeName, + plugin.deletedIndices.contains(index)); + + final List deletedShards = plugin.deletedShards.get(index); + assertThat(deletedShards, notNullValue()); + assertFalse("Listener should have been notified of deletion of one or more shards on node " + nodeName, + deletedShards.isEmpty()); + + for (ShardRouting shardRouting : shardsByNode.getValue()) { + final ShardId shardId = shardRouting.shardId(); + assertTrue("Listener should have been notified of deletion of shard " + shardId + " on node " + nodeName, + deletedShards.contains(shardId)); + } + } else { + assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); + assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); } - } else { - assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty()); - assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty()); } - } + }); } public void testListenersInvokedWhenIndexIsDangling() throws Exception { @@ -187,14 +195,89 @@ public void testListenersInvokedWhenIndexIsDangling() throws Exception { assertAcked(client().admin().indices().prepareDelete(indexName)); final String restartedNode = internalCluster().startNode(stoppedNodeDataPathSettings); - ensureClusterSizeConsistency(); - ensureClusterStateConsistency(); - assertPendingDeletesProcessed(); + assertBusy(() -> { + final IndexFoldersDeletionListenerPlugin plugin = plugin(restartedNode); + assertTrue("Listener should have been notified of deletion of index " + index + " on node " + restartedNode, + plugin.deletedIndices.contains(index)); + }); + } + + public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + + final Path dataDirWithLeftOverShards = createTempDir(); + String dataNode = internalCluster().startDataOnlyNode( + Settings.builder() + .putList(Environment.PATH_DATA_SETTING.getKey(), List.of(dataDirWithLeftOverShards.toAbsolutePath().toString())) + .putNull(Environment.PATH_SHARED_DATA_SETTING.getKey()) + .build() + ); + + final Index[] leftovers = new Index[between(1, 3)]; + logger.debug("--> creating [{}] leftover indices on data node [{}]", leftovers.length, dataNode); + for (int i = 0; i < leftovers.length; i++) { + final String indexName = "index-" + i; + createIndex(indexName, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.include._name", dataNode) + .build()); + ensureGreen(indexName); + leftovers[i] = internalCluster().clusterService(masterNode).state().metadata().index(indexName).getIndex(); + } + + logger.debug("--> stopping data node [{}], the data left on disk will be injected as left-overs in a newer data node", dataNode); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode)); - final IndexFoldersDeletionListenerPlugin plugin = plugin(restartedNode); + logger.debug("--> deleting leftover indices"); + assertAcked(client().admin().indices().prepareDelete("index-*")); + + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + logger.debug("--> creating a new index [{}]", indexName); + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.enable", EnableAllocationDecider.Allocation.NONE) + .build()) + .setWaitForActiveShards(ActiveShardCount.NONE)); + + final Index index = internalCluster().clusterService(masterNode).state().metadata().index(indexName).getIndex(); + logger.debug("--> index [{}] created", index); + + final List dataPaths = new ArrayList<>(); + for (int i = 0; i < leftovers.length; i++) { + final Path dataPath = createTempDir(); + dataPaths.add(dataPath); + final Path shardPath = dataPath.resolve(INDICES_FOLDER).resolve(index.getUUID()).resolve("0"); + Files.createDirectories(shardPath); + final Path leftoverPath = dataDirWithLeftOverShards.resolve(INDICES_FOLDER).resolve(leftovers[i].getUUID()).resolve("0"); + Files.move(leftoverPath.resolve(STATE_DIR_NAME), shardPath.resolve(STATE_DIR_NAME)); + Files.move(leftoverPath.resolve(INDEX_FOLDER_NAME), shardPath.resolve(INDEX_FOLDER_NAME)); + } + + logger.debug("--> starting another data node with data paths [{}]", dataPaths); + dataNode = internalCluster().startDataOnlyNode( + Settings.builder() + .putList(Environment.PATH_DATA_SETTING.getKey(), + dataPaths.stream().map(p -> p.toAbsolutePath().toString()).collect(Collectors.toList())) + .putNull(Environment.PATH_SHARED_DATA_SETTING.getKey()) + .build()); + + final IndexFoldersDeletionListenerPlugin plugin = plugin(dataNode); + assertTrue("Expecting no shards deleted on node " + dataNode, plugin.deletedShards.isEmpty()); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() + .put("index.routing.allocation.enable", EnableAllocationDecider.Allocation.ALL) + .put("index.routing.allocation.require._name", dataNode) + )); + ensureGreen(indexName); - assertTrue("Listener should have been notified of deletion of index " + index + " on node " + restartedNode, - plugin.deletedIndices.contains(index)); + assertTrue("Listener should have been notified of deletion of left-over shards for index " + index + " on node " + dataNode, + plugin.deletedShards.containsKey(index)); + assertThat("Listener should have been notified of [" + leftovers.length + "] deletions of left-over shard [0] on node " + dataNode, + plugin.deletedShards.get(index).size(), equalTo(leftovers.length)); } private Map> shardRoutingsByNodes(ClusterState clusterState, Index index) { From aea4e6db93d580c453eb81694fca2ea42bbaf7b7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 10 Dec 2020 11:33:15 +0100 Subject: [PATCH 10/10] feedback --- .../env/NodeEnvironmentTests.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 0ebc985a469fc..5de5346432656 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -220,12 +220,10 @@ public void testDeleteSafe() throws Exception { Files.createDirectories(path.resolve("1")); } - { - SetOnce listener = new SetOnce<>(); - ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, - () -> env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings, listener::set)); - assertNull(listener.get()); - } + expectThrows(ShardLockObtainFailedException.class, + () -> env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings, shardPaths -> { + assert false : "should not be called " + shardPaths; + })); for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path.resolve("0"))); @@ -246,12 +244,10 @@ public void testDeleteSafe() throws Exception { assertFalse(Files.exists(path.resolve("1"))); } - { - SetOnce listener = new SetOnce<>(); - ShardLockObtainFailedException ex = expectThrows(ShardLockObtainFailedException.class, - () -> env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings, listener::set)); - assertNull(listener.get()); - } + expectThrows(ShardLockObtainFailedException.class, + () -> env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings, indexPaths -> { + assert false : "should not be called " + indexPaths; + })); fooLock.close();