Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a mechanism to notify plugin before an index/shard folder is going to be deleted from disk #65926

Merged
merged 13 commits into from
Dec 10, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,17 @@ public void testLockTryingToDelete() throws Exception {
// Test without the regular shard lock to assume we can acquire it
// (worst case, meaning that the shard lock could be acquired and
// we're green to delete the shard's directory)
ShardLock sLock = new DummyShardLock(new ShardId(index, 0));
try {
env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY));
fail("should not have been able to delete the directory");
} catch (LockObtainFailedException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
final ShardLock sLock = new DummyShardLock(new ShardId(index, 0));
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY);

final LockObtainFailedException exception = expectThrows(LockObtainFailedException.class, () ->
env.deleteShardDirectoryUnderLock(sLock, indexSettings, indexPaths -> {
assert false : "should not be called " + indexPaths;
}));
assertThat(exception.getMessage(), exception.getMessage(), containsString("unable to acquire write.lock"));
}

public void testDurableFlagHasEffect() throws Exception {
public void testDurableFlagHasEffect() {
createIndex("test");
ensureGreen();
client().prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).get();
Expand Down

Large diffs are not rendered by default.

31 changes: 24 additions & 7 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -553,11 +554,15 @@ private static String toString(Collection<String> items) {
* @param shardId the id of the shard to delete to delete
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
public void deleteShardDirectorySafe(
ShardId shardId,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException, ShardLockObtainFailedException {
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) {
deleteShardDirectoryUnderLock(lock, indexSettings);
deleteShardDirectoryUnderLock(lock, indexSettings, listener);
}
}

Expand Down Expand Up @@ -602,18 +607,24 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
* @throws IOException if an IOException occurs
* @throws ElasticsearchException if the write.lock is not acquirable
*/
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
public void deleteShardDirectoryUnderLock(
ShardLock lock,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
listener.accept(paths);
IOUtils.rm(paths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId);
logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation);
acquireFSLockForPaths(indexSettings, customLocation);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
listener.accept(new Path[]{customLocation});
IOUtils.rm(customLocation);
}
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
Expand Down Expand Up @@ -665,11 +676,15 @@ private boolean isShardLocked(ShardId id) {
* @param indexSettings settings for the index being deleted
* @throws IOException if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
throws IOException, ShardLockObtainFailedException {
public void deleteIndexDirectorySafe(
Index index,
long lockTimeoutMS,
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException, ShardLockObtainFailedException {
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS);
try {
deleteIndexDirectoryUnderLock(index, indexSettings);
deleteIndexDirectoryUnderLock(index, indexSettings, listener);
} finally {
IOUtils.closeWhileHandlingException(locks);
}
Expand All @@ -682,13 +697,15 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
* @param index the index to delete
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings, Consumer<Path[]> listener) throws IOException {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
listener.accept(indexPaths);
IOUtils.rm(indexPaths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
listener.accept(new Path[]{customLocation});
IOUtils.rm(customLocation);
}
}
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry) throws IOException {
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
Expand Down Expand Up @@ -442,7 +443,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
valuesSourceRegistry, recoveryStateFactory);
valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener);
success = true;
return indexService;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final BitsetFilterCache bitsetFilterCache;
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
Expand Down Expand Up @@ -178,7 +179,9 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) {
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -219,6 +222,7 @@ public IndexService(
}

this.shardStoreDeleter = shardStoreDeleter;
this.indexFoldersDeletionListener = indexFoldersDeletionListener;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.scriptService = scriptService;
Expand Down Expand Up @@ -414,7 +418,8 @@ public synchronized IndexShard createShard(
} catch (IllegalStateException ex) {
logger.warn("{} failed to load shard path, trying to remove leftover", shardId);
try {
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings);
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings, shardPaths ->
indexFoldersDeletionListener.beforeShardFoldersDeleted(shardId, this.indexSettings, shardPaths));
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (Exception inner) {
ex.addSuppressed(inner);
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public final class ShardPath {
public static final String INDEX_FOLDER_NAME = "index";
Expand Down Expand Up @@ -172,8 +173,13 @@ public static ShardPath loadShardPath(Logger logger, ShardId shardId, String cus
* This method tries to delete left-over shards where the index name has been reused but the UUID is different
* to allow the new shard to be allocated.
*/
public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment env,
ShardLock lock, IndexSettings indexSettings) throws IOException {
public static void deleteLeftoverShardDirectory(
final Logger logger,
final NodeEnvironment env,
final ShardLock lock,
final IndexSettings indexSettings,
final Consumer<Path[]> listener
) throws IOException {
final String indexUUID = indexSettings.getUUID();
final Path[] paths = env.availableShardPaths(lock.getShardId());
for (Path path : paths) {
Expand All @@ -183,7 +189,8 @@ public static void deleteLeftoverShardDirectory(Logger logger, NodeEnvironment e
if (load.indexUUID.equals(indexUUID) == false && IndexMetadata.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) {
logger.warn("{} deleting leftover shard on path: [{}] with a different index UUID", lock.getShardId(), path);
assert Files.isDirectory(path) : path + " is not a directory";
NodeEnvironment.acquireFSLockForPaths(indexSettings, paths);
NodeEnvironment.acquireFSLockForPaths(indexSettings, path);
listener.accept(new Path[]{path});
IOUtils.rm(path);
}
}
Expand Down
24 changes: 17 additions & 7 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.store.CompositeIndexFoldersDeletionListener;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -223,6 +224,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListeners;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
Expand Down Expand Up @@ -251,7 +253,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -298,6 +301,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.indexFoldersDeletionListeners = new CompositeIndexFoldersDeletionListener(indexFoldersDeletionListeners);
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -675,7 +679,8 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC
indicesFieldDataCache,
namedWriteableRegistry,
this::isIdFieldDataEnabled,
valuesSourceRegistry
valuesSourceRegistry,
indexFoldersDeletionListeners
);
}

Expand Down Expand Up @@ -919,7 +924,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index
logger.debug("{} deleting index store reason [{}]", index, reason);
if (predicate.apply(index, indexSettings)) {
// its safe to delete all index metadata and shard data
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings,
paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths));
}
success = true;
} catch (ShardLockObtainFailedException ex) {
Expand Down Expand Up @@ -948,7 +954,8 @@ private void deleteIndexStoreIfDeletionAllowed(final String reason, final Index
public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException {
ShardId shardId = lock.getShardId();
logger.trace("{} deleting shard reason [{}]", shardId, reason);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings,
paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths));
}

/**
Expand All @@ -973,7 +980,8 @@ public void deleteShardStore(String reason, ShardId shardId, ClusterState cluste
if (shardDeletionCheckResult != ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE) {
throw new IllegalStateException("Can't delete shard " + shardId + " (cause: " + shardDeletionCheckResult + ")");
}
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings,
paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths));
logger.debug("{} deleted shard reason [{}]", shardId, reason);

if (canDeleteIndexContents(shardId.getIndex(), indexSettings)) {
Expand Down Expand Up @@ -1211,14 +1219,16 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time
assert delete.shardId == -1;
logger.debug("{} deleting index store reason [{}]", index, "pending delete");
try {
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings);
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings,
paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths));
iterator.remove();
} catch (IOException ex) {
logger.debug(() -> new ParameterizedMessage("{} retry pending delete", index), ex);
}
} else {
assert delete.shardId != -1;
ShardLock shardLock = locks.get(new ShardId(delete.index, delete.shardId));
final ShardId shardId = new ShardId(delete.index, delete.shardId);
final ShardLock shardLock = locks.get(shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
Expand Down
Loading