Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shards evictions in searchable snapshot cache service #67160

Merged
merged 19 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public void testCacheSurviveRestart() throws Exception {
}
assertFalse("no cache files found", cacheFiles.isEmpty());

CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
final CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

PersistentCache persistentCache = cacheService.getPersistentCache();
final PersistentCache persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
Expand All @@ -142,21 +142,19 @@ public Settings onNodeStopped(String nodeName) {
}
});

cacheService = internalCluster().getInstance(CacheService.class, dataNode);
persistentCache = cacheService.getPersistentCache();
final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode);
final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache();
ensureGreen(restoredIndexName);

cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
assertThat("Cache files should be repopulated in cache", persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size()));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> {
cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)));
assertTrue(internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache().hasDeletions());
cacheServiceAfterRestart.synchronizeCache();
assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L));
});
cacheService.synchronizeCache();

assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> pr
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
cleanExistingCacheFiles();
waitForPendingEvictions();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache(preWarmListener);
}
Expand Down Expand Up @@ -428,12 +428,12 @@ private void cleanExistingRegularShardFiles() {
}

/**
* Evicts all cache files associated to the current searchable snapshot shard in case a
* Waits for the eviction of cache files associated with the current searchable snapshot shard to be processed in case a
* previous instance of that same shard has been marked as evicted on this node.
*/
private void cleanExistingCacheFiles() {
private void waitForPendingEvictions() {
assert Thread.holdsLock(this);
cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache);
cacheService.waitForCacheFilesEvictionIfNeeded(snapshotId.getUUID(), indexId.getName(), shardId);
}

private void prewarmCache(ActionListener<Void> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;

import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {
Expand Down Expand Up @@ -116,14 +112,8 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
shardId
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;

/**
* This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If
Expand Down Expand Up @@ -62,14 +58,8 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
shardId
);
}
Expand Down
Loading