diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index 7cf678cda3977..5ffeb66ba1ce8 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -112,10 +112,10 @@ public void testCacheSurviveRestart() throws Exception { } assertFalse("no cache files found", cacheFiles.isEmpty()); - CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); + final CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); cacheService.synchronizeCache(); - PersistentCache persistentCache = cacheService.getPersistentCache(); + final PersistentCache persistentCache = cacheService.getPersistentCache(); assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @@ -142,21 +142,19 @@ public Settings onNodeStopped(String nodeName) { } }); - cacheService = internalCluster().getInstance(CacheService.class, dataNode); - persistentCache = cacheService.getPersistentCache(); + final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode); + final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache(); ensureGreen(restoredIndexName); cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); - assertThat("Cache files should be repopulated in cache", persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size())); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); assertBusy(() -> { cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))); - assertTrue(internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache().hasDeletions()); + cacheServiceAfterRestart.synchronizeCache(); + assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L)); }); - cacheService.synchronizeCache(); - - assertThat(persistentCache.getNumDocs(), equalTo(0L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 43b2c87309d49..184a610424368 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -212,7 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener pr this.snapshot = snapshotSupplier.get(); this.loaded = true; cleanExistingRegularShardFiles(); - cleanExistingCacheFiles(); + waitForPendingEvictions(); this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState; prewarmCache(preWarmListener); } @@ -428,12 +428,12 @@ private void cleanExistingRegularShardFiles() { } /** - * Evicts all cache files associated to the current searchable snapshot shard in case a + * Waits for the eviction of cache files associated with the current searchable snapshot shard to be processed in case a * previous instance of that same shard has been marked as evicted on this node. */ - private void cleanExistingCacheFiles() { + private void waitForPendingEvictions() { assert Thread.holdsLock(this); - cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache); + cacheService.waitForCacheFilesEvictionIfNeeded(snapshotId.getUUID(), indexId.getName(), shardId); } private void prewarmCache(ActionListener listener) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index f3ede77a0c58d..c293e2e8c81bd 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -25,18 +25,14 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { @@ -116,14 +112,8 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason); cacheService.markShardAsEvictedInCache( - new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) - ), - new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) - ), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), shardId ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java index c1da42a9e65be..c1d88c5fcc52d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java @@ -12,18 +12,14 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.IndexStorePlugin; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import java.util.Objects; import java.util.function.Supplier; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; /** * This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If @@ -62,14 +58,8 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId); cacheService.markShardAsEvictedInCache( - new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) - ), - new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) - ), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), shardId ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index 2d561c5847095..77823c2eed3c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -16,7 +17,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -25,8 +25,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -40,15 +39,21 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; @@ -140,10 +145,12 @@ public class CacheService extends AbstractLifecycleComponent { private final ByteSizeValue cacheSize; private final ByteSizeValue rangeSize; private final ByteSizeValue recoveryRangeSize; - private final KeyedLock shardsEvictionLock; - private final Set evictedShards; + private final Map> pendingShardsEvictions; + private final ReadWriteLock shardsEvictionsLock; + private final Object shardsEvictionsMutex; private volatile int maxCacheFilesToSyncAtOnce; + private boolean allowShardsEvictions; public CacheService( final Settings settings, @@ -163,8 +170,6 @@ public CacheService( .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); this.persistentCache = Objects.requireNonNull(persistentCache); - this.shardsEvictionLock = new KeyedLock<>(); - this.evictedShards = ConcurrentCollections.newConcurrentSet(); this.numberOfCacheFilesToSync = new AtomicLong(); this.cacheSyncLock = new ReentrantLock(); this.cacheFilesToSync = new ConcurrentLinkedQueue<>(); @@ -174,6 +179,10 @@ public CacheService( this.cacheSyncTask = new CacheSynchronizationTask(threadPool, SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING, this::setCacheSyncInterval); this.cacheSyncStopTimeout = SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT.get(settings); + this.shardsEvictionsLock = new ReentrantReadWriteLock(); + this.pendingShardsEvictions = new HashMap<>(); + this.shardsEvictionsMutex = new Object(); + this.allowShardsEvictions = true; } public static Path getShardCachePath(ShardPath shardPath) { @@ -206,12 +215,16 @@ protected void doStop() { cacheSyncTask.close(); } finally { try { - persistentCache.close(); - } catch (Exception e) { - logger.warn("failed to close persistent cache", e); + processAllPendingShardsEvictions(); } finally { - if (acquired) { - cacheSyncLock.unlock(); + try { + persistentCache.close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache", e); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } } } } @@ -338,82 +351,138 @@ public void removeFromCache(final CacheKey cacheKey) { /** * Marks the specified searchable snapshot shard as evicted in cache. Cache files associated with this shard will be evicted from cache. * - * @param snapshotId the {@link SnapshotId} - * @param indexId the {@link SnapshotId} - * @param shardId the {@link SnapshotId} + * @param snapshotUUID the snapshot's unique identifier + * @param snapshotIndexName the name of the index in the snapshot + * @param shardId the {@link ShardId} */ - public void markShardAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - final ShardEviction shardEviction = new ShardEviction(snapshotId.getUUID(), indexId.getName(), shardId); - if (evictedShards.add(shardEviction)) { - threadPool.generic().submit(new AbstractRunnable() { - @Override - protected void doRun() { - runIfShardMarkedAsEvictedInCache(shardEviction, () -> { - assert shardsEvictionLock.isHeldByCurrentThread(shardEviction); - final Map cacheFilesToEvict = new HashMap<>(); - cache.forEach((cacheKey, cacheFile) -> { - if (shardEviction.matches(cacheKey)) { - cacheFilesToEvict.put(cacheKey, cacheFile); - } - }); - for (Map.Entry cacheFile : cacheFilesToEvict.entrySet()) { - try { - cache.invalidate(cacheFile.getKey(), cacheFile.getValue()); - } catch (RuntimeException e) { - assert false : e; - logger.warn(() -> new ParameterizedMessage("failed to evict cache file {}", cacheFile.getKey()), e); - } - } - }); - } + public void markShardAsEvictedInCache(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + synchronized (shardsEvictionsMutex) { + if (allowShardsEvictions) { + final ShardEviction shardEviction = new ShardEviction(snapshotUUID, snapshotIndexName, shardId); + pendingShardsEvictions.computeIfAbsent(shardEviction, shard -> threadPool.generic().submit(new AbstractRunnable() { + @Override + protected void doRun() { + processShardEviction(shardEviction); + } - @Override - public void onFailure(Exception e) { - assert false : e; - logger.warn( - () -> new ParameterizedMessage("failed to evict cache files associated with evicted shard {}", shardEviction), - e - ); - } - }); + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("failed to evict cache files associated with shard {}", shardEviction), + e + ); + assert false : e; + } + })); + } } } /** - * Allows to run the specified {@link Runnable} if the shard represented by the triplet ({@link SnapshotId}, {@link IndexId}, - * {@link SnapshotId}) is still marked as evicted at the time this method is executed. The @link Runnable} will be executed - * while the current thread is holding the lock associated to the shard. + * Waits for the cache files associated with the shard represented by ({@link SnapshotId}, {@link IndexId}, {@link SnapshotId}) to be + * evicted if the shard is marked as evicted in cache at the time this method is executed. * - * @param snapshotId the snapshot the evicted searchable snapshots shard belongs to - * @param indexId the index in the snapshot the evicted searchable snapshots shard belongs to - * @param shardId the searchable snapshots shard id - * @param runnable a runnable to execute + * @param snapshotUUID the snapshot's unique identifier + * @param snapshotIndexName the name of the index in the snapshot + * @param shardId the {@link ShardId} */ - public void runIfShardMarkedAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId, Runnable runnable) { - runIfShardMarkedAsEvictedInCache(new ShardEviction(snapshotId.getUUID(), indexId.getName(), shardId), runnable); + public void waitForCacheFilesEvictionIfNeeded(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + assert assertGenericThreadPool(); + final Future future; + synchronized (shardsEvictionsMutex) { + if (allowShardsEvictions == false) { + throw new AlreadyClosedException("Cannot wait for shard eviction to be processed, cache is stopping"); + } + future = pendingShardsEvictions.get(new ShardEviction(snapshotUUID, snapshotIndexName, shardId)); + if (future == null) { + return; + } + } + FutureUtils.get(future); } /** - * Allows to run the specified {@link Runnable} if the shard represented by {@link ShardEviction} is still marked as evicted at the time - * this method is executed. The @link Runnable} will be executed while the current thread is holding the lock associated to the shard. + * Evicts the cache files associated to the specified {@link ShardEviction}. * - * @param shardEviction a {@link ShardEviction} representing the shard marked as evicted - * @param runnable a runnable to execute + * @param shardEviction the shard eviction to process */ - private void runIfShardMarkedAsEvictedInCache(ShardEviction shardEviction, Runnable runnable) { - try (Releasable ignored = shardsEvictionLock.acquire(shardEviction)) { - boolean success = false; + private void processShardEviction(ShardEviction shardEviction) { + assert isPendingShardEviction(shardEviction) : "shard is not marked as evicted: " + shardEviction; + assert assertGenericThreadPool(); + + shardsEvictionsLock.readLock().lock(); + try { try { - if (evictedShards.remove(shardEviction)) { - runnable.run(); + final boolean canEvict; + synchronized (shardsEvictionsMutex) { + canEvict = allowShardsEvictions; + } + if (canEvict) { + final List cacheFilesToEvict = new ArrayList<>(); + cache.forEach((cacheKey, cacheFile) -> { + if (shardEviction.matches(cacheKey)) { + cacheFilesToEvict.add(cacheFile); + } + }); + for (CacheFile cacheFile : cacheFilesToEvict) { + try { + cache.invalidate(cacheFile.getCacheKey(), cacheFile); + } catch (RuntimeException e) { + logger.warn(() -> new ParameterizedMessage("failed to evict cache file {}", cacheFile.getCacheKey()), e); + assert false : e; + } + } + logger.debug( + "shard eviction [{}] processed with [{}] cache files invalidated", + shardEviction, + cacheFilesToEvict.size() + ); } - success = true; } finally { - if (success == false) { - final boolean added = evictedShards.add(shardEviction); - assert added : shardEviction; + synchronized (shardsEvictionsMutex) { + final Future removedFuture = pendingShardsEvictions.remove(shardEviction); + assert removedFuture != null; } } + } finally { + shardsEvictionsLock.readLock().unlock(); + } + } + + /** + * Processes and waits for all pending shard evictions to complete. + */ + private void processAllPendingShardsEvictions() { + synchronized (shardsEvictionsMutex) { + allowShardsEvictions = false; + } + boolean success = false; + try { + if (shardsEvictionsLock.writeLock().tryLock(10L, TimeUnit.SECONDS) == false) { + logger.warn("waiting for shards evictions to be processed"); + shardsEvictionsLock.writeLock().lock(); // wait indefinitely + } + success = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("interrupted while waiting shards evictions to be processed", e); + } finally { + if (success) { + shardsEvictionsLock.writeLock().unlock(); + } + } + } + + boolean isPendingShardEviction(ShardEviction shardEviction) { + synchronized (shardsEvictionsMutex) { + return pendingShardsEvictions.get(shardEviction) != null; + } + } + + // used in tests + Map> pendingShardsEvictions() { + synchronized (shardsEvictionsMutex) { + return Map.copyOf(pendingShardsEvictions); } } @@ -497,14 +566,6 @@ public void synchronizeCache() { final long value = numberOfCacheFilesToSync.decrementAndGet(); assert value >= 0 : value; - final CacheKey cacheKey = cacheFile.getCacheKey(); - if (evictedShards.contains( - new ShardEviction(cacheKey.getSnapshotUUID(), cacheKey.getSnapshotIndexName(), cacheKey.getShardId()) - )) { - logger.debug("cache file belongs to a shard marked as evicted, skipping synchronization for [{}]", cacheKey); - continue; - } - final Path cacheFilePath = cacheFile.getFile(); try { final SortedSet> ranges = cacheFile.fsync(); @@ -589,18 +650,30 @@ public String toString() { * Represents the searchable snapshots information of a shard that has been removed from the node. These information are kept around * to evict the cache files associated to that shard. */ - private static class ShardEviction { + static class ShardEviction { private final String snapshotUUID; private final String snapshotIndexName; private final ShardId shardId; - private ShardEviction(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + ShardEviction(String snapshotUUID, String snapshotIndexName, ShardId shardId) { this.snapshotUUID = snapshotUUID; this.snapshotIndexName = snapshotIndexName; this.shardId = shardId; } + public String getSnapshotUUID() { + return snapshotUUID; + } + + public String getSnapshotIndexName() { + return snapshotIndexName; + } + + public ShardId getShardId() { + return shardId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -627,4 +700,11 @@ boolean matches(CacheKey cacheKey) { && Objects.equals(shardId, cacheKey.getShardId()); } } + + private static boolean assertGenericThreadPool() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains('[' + ThreadPool.Names.GENERIC + ']') + || threadName.startsWith("TEST-") : "expected generic thread pool but got " + threadName; + return true; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index a467d42966a1e..9a23efd4bb472 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -24,6 +25,8 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -39,11 +42,21 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Locale; import java.util.Set; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; + public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTestCase { private static final ClusterSettings CLUSTER_SETTINGS = new ClusterSettings( @@ -190,4 +203,51 @@ protected static void assertThreadPoolNotBusy(ThreadPool threadPool) throws Exce } }, 30L, TimeUnit.SECONDS); } + + /** + * Generates one or more cache files using the specified {@link CacheService}. Each cache files have been written at least once. + */ + protected List randomCacheFiles(CacheService cacheService) throws Exception { + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + + final List cacheFiles = new ArrayList<>(); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + final String snapshotUUID = UUIDs.randomBase64UUID(random()); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotUUID) + ); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotUUID, indexId.getName(), shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(100L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + SortedSet> ranges = Collections.emptySortedSet(); + while (ranges.isEmpty()) { + ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + cacheFiles.add(cacheFile); + } finally { + cacheFile.release(listener); + } + } + } + } + } + return cacheFiles; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 4139cc17594a5..0177a13c09c91 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -8,21 +8,21 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.ShardEviction; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -30,15 +30,20 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.stream.Collectors; import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; +import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -221,44 +226,161 @@ public void testPut() throws Exception { } } - public void testRunIfShardMarkedAsEvictedInCache() throws Exception { - final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); - final Path cacheDir = Files.createDirectories(resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID())); - + public void testMarkShardAsEvictedInCache() throws Exception { final CacheService cacheService = defaultCacheService(); - cacheService.setCacheSyncInterval(TimeValue.ZERO); cacheService.start(); - cacheService.runIfShardMarkedAsEvictedInCache( - snapshotId, - indexId, - shardId, - () -> { assert false : "should not be called: shard is not marked as evicted yet"; } - ); + final List randomCacheFiles = randomCacheFiles(cacheService); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); - // this future is used to block the cache file eviction submitted by markShardAsEvictedInCache - final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); - final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); + final ShardEviction shard = randomShardEvictionFrom(randomCacheFiles); + final List cacheFilesAssociatedWithShard = filterByShard(shard, randomCacheFiles); + cacheFilesAssociatedWithShard.forEach(cacheFile -> assertTrue(Files.exists(cacheFile.getFile()))); - final CacheKey cacheKey = new CacheKey(snapshotId.getUUID(), indexId.getName(), shardId, "_0.dvd"); - final CacheFile cacheFile = cacheService.get(cacheKey, 100, cacheDir); - cacheFile.acquire(evictionListener); + final BlockingEvictionListener blockingListener = new BlockingEvictionListener(); + final CacheFile randomCacheFile = randomFrom(cacheFilesAssociatedWithShard); + assertTrue(Files.exists(randomCacheFile.getFile())); + randomCacheFile.acquire(blockingListener); + + final List randomEvictedCacheFiles = randomSubsetOf(randomCacheFiles); + for (CacheFile randomEvictedCacheFile : randomEvictedCacheFiles) { + if (randomEvictedCacheFile != randomCacheFile) { + cacheService.removeFromCache(randomEvictedCacheFile.getCacheKey()); + } + } + + for (int i = 0; i < between(1, 3); i++) { + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + } + + blockingListener.waitForBlock(); + + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(1)); + assertTrue(cacheService.isPendingShardEviction(shard)); + + blockingListener.unblock(); + + assertBusy(() -> assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0))); + + for (CacheFile cacheFile : randomCacheFiles) { + final boolean evicted = cacheFilesAssociatedWithShard.contains(cacheFile) || randomEvictedCacheFiles.contains(cacheFile); + assertThat( + "Cache file [" + cacheFile + "] should " + (evicted ? "be deleted" : "exist"), + Files.notExists(cacheFile.getFile()), + equalTo(evicted) + ); + } + cacheService.close(); - cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); if (randomBoolean()) { - cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); // no effect + // mark shard as evicted after cache service is stopped should have no effect + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); } - waitForEviction.get(30L, TimeUnit.SECONDS); - cacheFile.release(evictionListener); - - cacheService.runIfShardMarkedAsEvictedInCache( - snapshotId, - indexId, - shardId, - () -> { assert false : "should not be called: shard eviction marker is removed"; } + } + + public void testProcessShardEviction() throws Exception { + final CacheService cacheService = defaultCacheService(); + cacheService.start(); + + final List randomCacheFiles = randomCacheFiles(cacheService); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); + + final ShardEviction shard = randomShardEvictionFrom(randomCacheFiles); + final List cacheFilesAssociatedWithShard = filterByShard(shard, randomCacheFiles); + cacheFilesAssociatedWithShard.forEach(cacheFile -> assertTrue(Files.exists(cacheFile.getFile()))); + + final BlockingEvictionListener blockingListener = new BlockingEvictionListener(); + final CacheFile randomCacheFile = randomFrom(cacheFilesAssociatedWithShard); + assertTrue(Files.exists(randomCacheFile.getFile())); + randomCacheFile.acquire(blockingListener); + + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + + final Map afterShardRecoveryCacheFiles = ConcurrentCollections.newConcurrentMap(); + final Future waitForShardEvictionFuture = threadPool.generic().submit(() -> { + cacheService.waitForCacheFilesEvictionIfNeeded(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + for (CacheFile cacheFile : cacheFilesAssociatedWithShard) { + afterShardRecoveryCacheFiles.put(cacheFile, Files.exists(cacheFile.getFile())); + } + }); + + blockingListener.waitForBlock(); + + final Map> pendingShardsEvictions = cacheService.pendingShardsEvictions(); + assertTrue(cacheService.isPendingShardEviction(shard)); + assertThat(pendingShardsEvictions, aMapWithSize(1)); + + final Future pendingShardEvictionFuture = pendingShardsEvictions.get(shard); + assertTrue(Files.exists(randomCacheFile.getFile())); + assertThat(pendingShardEvictionFuture, notNullValue()); + assertFalse(pendingShardEvictionFuture.isDone()); + + blockingListener.unblock(); + FutureUtils.get(waitForShardEvictionFuture); + + assertTrue(pendingShardEvictionFuture.isDone()); + FutureUtils.get(pendingShardEvictionFuture); + + cacheFilesAssociatedWithShard.forEach( + cacheFile -> assertFalse("Cache file should be evicted: " + cacheFile, Files.exists(cacheFile.getFile())) ); - cacheService.close(); + afterShardRecoveryCacheFiles.forEach( + (cacheFile, exists) -> assertFalse("Cache file should have been evicted after shard recovery: " + cacheFile, exists) + ); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); + + cacheService.stop(); + } + + private static class BlockingEvictionListener implements CacheFile.EvictionListener { + + private final CountDownLatch evictionLatch = new CountDownLatch(1); + private final CountDownLatch releaseLatch = new CountDownLatch(1); + + @Override + public void onEviction(CacheFile evictedCacheFile) { + try { + evictionLatch.countDown(); + releaseLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + evictedCacheFile.release(this); + } + } + + public void waitForBlock() { + try { + evictionLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + public void unblock() { + releaseLatch.countDown(); + } + } + + /** + * Picks up a random searchable snapshot shard from a list of existing cache files and builds a {@link ShardEviction} object from it. + * + * @param cacheFiles a list of existing cache files + * @return a random {@link ShardEviction} object + */ + private static ShardEviction randomShardEvictionFrom(List cacheFiles) { + return randomFrom(listOfShardEvictions(cacheFiles)); + } + + private static Set listOfShardEvictions(List cacheFiles) { + return cacheFiles.stream() + .map(CacheFile::getCacheKey) + .map(cacheKey -> new ShardEviction(cacheKey.getSnapshotUUID(), cacheKey.getSnapshotIndexName(), cacheKey.getShardId())) + .collect(Collectors.toSet()); + } + + private List filterByShard(ShardEviction shard, List cacheFiles) { + return cacheFiles.stream().filter(cacheFile -> shard.matches(cacheFile.getCacheKey())).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 454149ec3851c..cafcd77b2c6a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -9,38 +9,26 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile; -import org.elasticsearch.index.store.cache.CacheKey; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; -import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.SortedSet; import java.util.stream.Collectors; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals; -import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; @@ -140,20 +128,18 @@ public void testRepopulateCache() throws Exception { cacheService.setCacheSyncInterval(TimeValue.ZERO); cacheService.start(); - final List cacheFiles = generateRandomCacheFiles(cacheService); + final List cacheFiles = randomCacheFiles(cacheService); cacheService.synchronizeCache(); - if (cacheFiles.isEmpty() == false) { - final List removedCacheFiles = randomSubsetOf(cacheFiles); - for (CacheFile removedCacheFile : removedCacheFiles) { - if (randomBoolean()) { - // evict cache file from the cache - cacheService.removeFromCache(removedCacheFile.getCacheKey()); - } else { - IOUtils.rm(removedCacheFile.getFile()); - } - cacheFiles.remove(removedCacheFile); + final List removedCacheFiles = randomSubsetOf(cacheFiles); + for (CacheFile removedCacheFile : removedCacheFiles) { + if (randomBoolean()) { + // evict cache file from the cache + cacheService.removeFromCache(removedCacheFile.getCacheKey()); + } else { + IOUtils.rm(removedCacheFile.getFile()); } + cacheFiles.remove(removedCacheFile); } cacheService.stop(); @@ -172,7 +158,7 @@ public void testCleanUp() throws Exception { final List cacheFiles; try (CacheService cacheService = defaultCacheService()) { cacheService.start(); - cacheFiles = generateRandomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); + cacheFiles = randomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); if (randomBoolean()) { cacheService.synchronizeCache(); } @@ -186,50 +172,4 @@ public void testCleanUp() throws Exception { PersistentCache.cleanUp(nodeSettings, nodeEnvironment); assertTrue(cacheFiles.stream().noneMatch(Files::exists)); } - - /** - * Generates 1 or more cache files using the specified {@link CacheService}. - */ - private List generateRandomCacheFiles(CacheService cacheService) throws Exception { - final byte[] buffer = new byte[1024]; - Arrays.fill(buffer, (byte) 0xff); - - final List cacheFiles = new ArrayList<>(); - for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { - final String snapshotUUID = UUIDs.randomBase64UUID(random()); - for (int indices = 0; indices < between(1, 2); indices++) { - IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - for (int shards = 0; shards < between(1, 2); shards++) { - ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); - - final Path cacheDir = Files.createDirectories( - CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotUUID) - ); - - for (int files = 0; files < between(1, 2); files++) { - final CacheKey cacheKey = new CacheKey(snapshotUUID, indexId.getName(), shardId, "file_" + files); - final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); - - final CacheFile.EvictionListener listener = evictedCacheFile -> {}; - cacheFile.acquire(listener); - try { - SortedSet> ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { - try { - channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - if (ranges.isEmpty() == false) { - cacheFiles.add(cacheFile); - } - } finally { - cacheFile.release(listener); - } - } - } - } - } - return cacheFiles; - } }