From f0e2e08ed493198890ca928d4ea26748afffa273 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 16:29:39 +0100 Subject: [PATCH] Allow to add existing cache files in searchable snapshots cache service (#65538) (#66257) This commit adds a new put() method to the CacheService that allows to add existing cache files to the searchable snapshot cache before the service is effectively started. This method will be used in the future to fill the cache at node start-up time with information retrieved from a Lucene index. --- .../index/store/cache/CacheFile.java | 24 ++++++--- .../cache/CacheService.java | 54 ++++++++++++++++++- .../index/store/cache/CacheFileTests.java | 37 ++++++++++--- .../store/cache/SparseFileTrackerTests.java | 15 +----- .../index/store/cache/TestUtils.java | 14 +++++ .../cache/CacheServiceTests.java | 45 ++++++++++++++++ 6 files changed, 161 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 1bf5ac4e1794..c1dd02cb505c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -65,7 +65,7 @@ protected void closeInternal() { }; private final SparseFileTracker tracker; - private final String description; + private final CacheKey cacheKey; private final Path file; private final Set listeners = new HashSet<>(); @@ -123,14 +123,26 @@ protected void closeInternal() { @Nullable private volatile FileChannelReference channelRef; - public CacheFile(String description, long length, Path file, Runnable onNeedFSync) { - this.tracker = new SparseFileTracker(file.toString(), length); - this.description = Objects.requireNonNull(description); + public CacheFile(CacheKey cacheKey, long length, Path file, Runnable onNeedFSync) { + this(cacheKey, new SparseFileTracker(file.toString(), length), file, onNeedFSync); + } + + public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet> ranges, Runnable onNeedFSync) { + this(cacheKey, new SparseFileTracker(file.toString(), length, ranges), file, onNeedFSync); + } + + private CacheFile(CacheKey cacheKey, SparseFileTracker tracker, Path file, Runnable onNeedFSync) { + this.cacheKey = Objects.requireNonNull(cacheKey); + this.tracker = Objects.requireNonNull(tracker); this.file = Objects.requireNonNull(file); this.needsFsyncRunnable = Objects.requireNonNull(onNeedFSync); assert invariant(); } + public CacheKey getCacheKey() { + return cacheKey; + } + public long getLength() { return tracker.getLength(); } @@ -247,8 +259,8 @@ private boolean invariant() { public String toString() { synchronized (listeners) { return "CacheFile{" - + "desc='" - + description + + "key='" + + cacheKey + "', file=" + file + ", length=" diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index b064a09434ad..b3bdaafd96cc 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.threadpool.ThreadPool; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -178,6 +179,14 @@ protected void doStop() { @Override protected void doClose() {} + private void ensureLifecycleInitializing() { + final Lifecycle.State state = lifecycleState(); + assert state == Lifecycle.State.INITIALIZED : state; + if (state != Lifecycle.State.INITIALIZED) { + throw new IllegalStateException("Failed to read data from cache: cache service is not initializing [" + state + "]"); + } + } + private void ensureLifecycleStarted() { final Lifecycle.State state = lifecycleState(); assert state != Lifecycle.State.INITIALIZED : state; @@ -200,6 +209,16 @@ public int getRangeSize() { return toIntBytes(rangeSize.getBytes()); } + /** + * Retrieves the {@link CacheFile} instance associated with the specified {@link CacheKey} in the cache. If the key is not already + * associated with a {@link CacheFile}, this method creates a new instance using the given file length and cache directory. + * + * @param cacheKey the cache key whose associated {@link CacheFile} instance is to be returned or computed for if non-existent + * @param fileLength the length of the cache file (required to compute a new instance) + * @param cacheDir the cache directory where the cache file on disk is created (required to compute a new instance) + * @return the current {@link CacheFile} instance (existing or computed) + * @throws Exception if this method is used when the {@link CacheService} is not started + */ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path cacheDir) throws Exception { ensureLifecycleStarted(); return cache.computeIfAbsent(cacheKey, key -> { @@ -211,11 +230,44 @@ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path assert Files.notExists(path) : "cache file already exists " + path; final SetOnce cacheFile = new SetOnce<>(); - cacheFile.set(new CacheFile(key.toString(), fileLength, path, () -> onCacheFileUpdate(cacheFile.get()))); + cacheFile.set(new CacheFile(key, fileLength, path, () -> onCacheFileUpdate(cacheFile.get()))); return cacheFile.get(); }); } + /** + * Computes a new {@link CacheFile} instance using the specified cache file information (file length, file name, parent directory and + * already available cache ranges) and associates it with the specified {@link CacheKey} in the cache. If the key is already + * associated with a {@link CacheFile}, the previous instance is replaced by a new one. + * + * This method can only be used before the {@link CacheService} is started. + * + * @param cacheKey the cache key with which the new {@link CacheFile} instance is to be associated + * @param fileLength the length of the cache file + * @param cacheDir the cache directory where the cache file on disk is located + * @param cacheFileUuid the name of the cache file on disk (should be a UUID) + * @param cacheFileRanges the set of ranges that are known to be already available/completed for this cache file + * @throws Exception if this method is used when the {@link CacheService} is not initializing + */ + void put( + final CacheKey cacheKey, + final long fileLength, + final Path cacheDir, + final String cacheFileUuid, + final SortedSet> cacheFileRanges + ) throws Exception { + + ensureLifecycleInitializing(); + final Path path = cacheDir.resolve(cacheFileUuid); + if (Files.exists(path) == false) { + throw new FileNotFoundException("Cache file [" + path + "] not found"); + } + + final SetOnce cacheFile = new SetOnce<>(); + cacheFile.set(new CacheFile(cacheKey, fileLength, path, cacheFileRanges, () -> onCacheFileUpdate(cacheFile.get()))); + cache.put(cacheKey, cacheFile.get()); + } + /** * Evicts the cache file associated with the specified cache key. * diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 2121a72fdb54..393cd0aeb317 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -7,12 +7,16 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; @@ -25,6 +29,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.SortedSet; import java.util.concurrent.Future; @@ -44,10 +49,28 @@ public class CacheFileTests extends ESTestCase { private static final Runnable NOOP = () -> {}; + private static final CacheKey CACHE_KEY = new CacheKey( + new SnapshotId("_name", "_uuid"), + new IndexId("_name", "_uuid"), + new ShardId("_name", "_uuid", 0), + "_filename" + ); + + public void testGetCacheKey() throws Exception { + final CacheKey cacheKey = new CacheKey( + new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), randomInt(5)), + randomAlphaOfLength(105).toLowerCase(Locale.ROOT) + ); + + final CacheFile cacheFile = new CacheFile(cacheKey, randomLongBetween(1, 100), createTempFile(), NOOP); + assertThat(cacheFile.getCacheKey(), sameInstance(cacheKey)); + } public void testAcquireAndRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); + final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP); assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue()); assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false)); @@ -86,7 +109,7 @@ public void testAcquireAndRelease() throws Exception { public void testCacheFileNotAcquired() throws IOException { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); + final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP); assertThat(Files.exists(file), is(false)); assertThat(cacheFile.getChannel(), nullValue()); @@ -108,7 +131,7 @@ public void testCacheFileNotAcquired() throws IOException { public void testDeleteOnCloseAfterLastRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); + final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP); final List acquiredListeners = new ArrayList<>(); for (int i = 0; i < randomIntBetween(1, 20); i++) { @@ -140,7 +163,7 @@ public void testDeleteOnCloseAfterLastRelease() throws Exception { public void testConcurrentAccess() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); + final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP); final TestEvictionListener evictionListener = new TestEvictionListener(); cacheFile.acquire(evictionListener); @@ -190,7 +213,7 @@ public void testFSync() throws Exception { try { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( - "test", + CACHE_KEY, randomLongBetween(100, 1000), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) @@ -237,7 +260,7 @@ public void testFSyncOnEvictedFile() throws Exception { try { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( - "test", + CACHE_KEY, randomLongBetween(1L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) @@ -291,7 +314,7 @@ public void testFSyncFailure() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( - "test", + CACHE_KEY, randomLongBetween(1L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index 15462766d1a5..50a40f6d065c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -26,6 +26,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges; +import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -566,18 +567,4 @@ private static boolean processGap(byte[] fileContents, SparseFileTracker.Gap gap return true; } } - - /** - * Generates a sorted set of non-empty and non-contiguous random ranges that could fit into a file of a given maximum length. - */ - private static SortedSet> randomRanges(long length) { - final SortedSet> randomRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); - for (long i = 0L; i < length;) { - long start = randomLongBetween(i, Math.max(0L, length - 1L)); - long end = randomLongBetween(start + 1L, length); // +1 for non empty ranges - randomRanges.add(Tuple.tuple(start, end)); - i = end + 1L + randomLongBetween(0L, Math.max(0L, length - end)); // +1 for non contiguous ranges - } - return randomRanges; - } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 2498efd5080d..8d99956b0360 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -101,6 +101,20 @@ static long numberOfRanges(int fileSize, int rangeSize) { return numberOfRanges; } + /** + * Generates a sorted set of non-empty and non-contiguous random ranges that could fit into a file of a given maximum length. + */ + public static SortedSet> randomRanges(long length) { + final SortedSet> randomRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + for (long i = 0L; i < length;) { + long start = randomLongBetween(i, Math.max(0L, length - 1L)); + long end = randomLongBetween(start + 1L, length); // +1 for non empty ranges + randomRanges.add(Tuple.tuple(start, end)); + i = end + 1L + randomLongBetween(0L, Math.max(0L, length - end)); // +1 for non contiguous ranges + } + return randomRanges; + } + public static SortedSet> mergeContiguousRanges(final SortedSet> ranges) { // Eclipse needs the TreeSet type to be explicit (see https://bugs.eclipse.org/bugs/show_bug.cgi?id=568600) return ranges.stream().collect(() -> new TreeSet>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 67fe8ae1ffdc..7a6828d11630 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -23,6 +23,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; +import java.io.FileNotFoundException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; @@ -30,9 +31,14 @@ import java.util.Map; import java.util.SortedSet; +import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; +import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class CacheServiceTests extends AbstractSearchableSnapshotsTestCase { @@ -162,4 +168,43 @@ public void testCacheSynchronization() throws Exception { } } } + + public void testPut() throws Exception { + final Path cacheDir = createTempDir(); + try (CacheService cacheService = defaultCacheService()) { + final long fileLength = randomLongBetween(0L, 1000L); + final CacheKey cacheKey = new CacheKey( + new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), randomInt(5)), + randomAlphaOfLength(105).toLowerCase(Locale.ROOT) + ); + final String cacheFileUuid = UUIDs.randomBase64UUID(random()); + final SortedSet> cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); + + if (randomBoolean()) { + final Path cacheFilePath = cacheDir.resolve(cacheFileUuid); + Files.createFile(cacheFilePath); + + cacheService.put(cacheKey, fileLength, cacheDir, cacheFileUuid, cacheFileRanges); + + cacheService.start(); + final CacheFile cacheFile = cacheService.get(cacheKey, fileLength, cacheDir); + assertThat(cacheFile, notNullValue()); + assertThat(cacheFile.getFile(), equalTo(cacheFilePath)); + assertThat(cacheFile.getCacheKey(), equalTo(cacheKey)); + assertThat(cacheFile.getLength(), equalTo(fileLength)); + + for (Tuple cacheFileRange : cacheFileRanges) { + assertThat(cacheFile.getAbsentRangeWithin(cacheFileRange.v1(), cacheFileRange.v2()), nullValue()); + } + } else { + final FileNotFoundException exception = expectThrows( + FileNotFoundException.class, + () -> cacheService.put(cacheKey, fileLength, cacheDir, cacheFileUuid, cacheFileRanges) + ); + assertThat(exception.getMessage(), containsString(cacheFileUuid)); + } + } + } }