Skip to content

Commit

Permalink
Use listeners to manage cache files associated with removed shards
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Dec 10, 2020
1 parent 68b5465 commit aeeabc7
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 20 deletions.
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -650,6 +651,31 @@ public void remove() {
};
}

/**
* Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations
* that occurs within the same cache segment by acquiring the segment's writing lock during all the iteration. As such, the specified
* consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer
* but modification that occur in non yet traveled segments should be.
*
* @param consumer the {@link Consumer}
*/
public void forEach(BiConsumer<K, V> consumer) {
for (CacheSegment<K, V> segment : segments) {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
for (CompletableFuture<Entry<K, V>> future : segment.map.values()) {
try {
if (future != null && future.isDone()) {
final Entry<K, V> entry = future.get();
consumer.accept(entry.key, entry.value);
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
}

private class CacheIterator implements Iterator<Entry<K, V>> {
private Entry<K, V> current;
private Entry<K, V> next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.indices.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -31,7 +29,6 @@

public class CompositeIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener {

private static final Logger logger = LogManager.getLogger(CompositeIndexFoldersDeletionListener.class);
private final List<IndexStorePlugin.IndexFoldersDeletionListener> listeners;

public CompositeIndexFoldersDeletionListener(List<IndexStorePlugin.IndexFoldersDeletionListener> listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
Expand Down Expand Up @@ -213,6 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> pr
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
cleanExistingCacheFiles();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache(preWarmListener);
}
Expand Down Expand Up @@ -330,9 +330,6 @@ private static UnsupportedOperationException unsupportedException() {
public final void close() {
if (closed.compareAndSet(false, true)) {
isOpen = false;
// Ideally we could let the cache evict/remove cached files by itself after the
// directory has been closed.
clearCache();
}
}

Expand Down Expand Up @@ -416,6 +413,15 @@ private void cleanExistingRegularShardFiles() {
}
}

/**
* Evicts all cache files associated to the current searchable snapshot shard in case a
* previous instance of that same shard has been marked as evicted on this node.
*/
private void cleanExistingCacheFiles() {
assert Thread.holdsLock(this);
cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache);
}

private void prewarmCache(ActionListener<Void> listener) {
if (prewarmCache == false) {
recoveryState.setPreWarmComplete();
Expand Down Expand Up @@ -566,7 +572,6 @@ public static Directory create(

final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID());
Files.createDirectories(cacheDir);
assert assertCacheIsEmpty(cacheDir);

return new InMemoryNoOpCommitDirectory(
new SearchableSnapshotDirectory(
Expand All @@ -587,17 +592,6 @@ public static Directory create(
);
}

private static boolean assertCacheIsEmpty(Path cacheDir) {
try (DirectoryStream<Path> cacheDirStream = Files.newDirectoryStream(cacheDir)) {
final Set<Path> cacheFiles = new HashSet<>();
cacheDirStream.forEach(cacheFiles::add);
assert cacheFiles.isEmpty() : "should start with empty cache, but found " + cacheFiles;
} catch (IOException e) {
assert false : e;
}
return true;
}

public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
while (dir != null) {
if (dir instanceof SearchableSnapshotDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
Expand All @@ -20,16 +24,31 @@
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final @Nullable CacheService cacheService;

public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheService cacheService) {
assert cacheService != null || DiscoveryNode.isDataNode(settings) == false;
this.cacheService = cacheService;
}

@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand Down Expand Up @@ -78,4 +97,35 @@ private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}

@Override
public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) {
if (cacheService != null && shouldEvictCacheFiles(reason)) {
final IndexSettings indexSettings = indexService.getIndexSettings();
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (IndexShard indexShard : indexService) {
final ShardId shardId = indexShard.shardId();

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
}
}

private static boolean shouldEvictCacheFiles(IndexRemovalReason reason) {
return reason == IndexRemovalReason.DELETED
|| reason == IndexRemovalReason.NO_LONGER_ASSIGNED
|| reason == IndexRemovalReason.FAILURE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;

/**
* This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If
* the index (or the shard) is a backed by a snapshot this listener notifies the {@link CacheService} that the cache files associated to the
* shard(s) must be evicted.
*/
public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final Supplier<CacheService> cacheService;

public SearchableSnapshotIndexFoldersDeletionListener(Supplier<CacheService> cacheService) {
this.cacheService = Objects.requireNonNull(cacheService);
}

@Override
public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (int shard = 0; shard < indexSettings.getNumberOfShards(); shard++) {
markShardAsEvictedInCache(new ShardId(index, shard), indexSettings);
}
}
}

@Override
public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
markShardAsEvictedInCache(shardId, indexSettings);
}
}

private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) {
final CacheService cacheService = this.cacheService.get();
assert cacheService != null : "cache service not initialized";

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,19 @@ public Collection<Object> createComponents(
@Override
public void onIndexModule(IndexModule indexModule) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(settings, cacheService.get()));
indexModule.addIndexEventListener(failShardsListener.get());
}
}

@Override
public List<IndexFoldersDeletionListener> getIndexFoldersDeletionListeners() {
if (DiscoveryNode.isDataNode(settings)) {
return List.of(new SearchableSnapshotIndexFoldersDeletionListener(cacheService::get));
}
return List.of();
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(new SystemIndexDescriptor(SNAPSHOT_BLOB_CACHE_INDEX, "Contains cached data of blob store repositories"));
Expand Down
Loading

0 comments on commit aeeabc7

Please sign in to comment.