From a1e42b1131d5d807dc66d13dd960ebe707da3f09 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Tue, 16 May 2023 17:17:30 -0700 Subject: [PATCH] Add filecache support in clear indices cache API (#7498) Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 1 + .../client/IndicesRequestConverters.java | 1 + .../client/IndicesRequestConvertersTests.java | 4 ++ .../api/indices.clear_cache.json | 4 ++ .../clear/ClearIndicesCacheBlocksIT.java | 39 ++++++++++++++ .../cache/clear/ClearIndicesCacheRequest.java | 17 ++++++ .../ClearIndicesCacheRequestBuilder.java | 5 ++ .../TransportClearIndicesCacheAction.java | 16 ++++++ .../store/remote/filecache/FileCache.java | 6 +++ .../store/remote/utils/cache/LRUCache.java | 6 ++- .../remote/utils/cache/RefCountedCache.java | 13 ++++- .../remote/utils/cache/SegmentedCache.java | 10 ++++ .../indices/RestClearIndicesCacheAction.java | 1 + ...TransportClearIndicesCacheActionTests.java | 53 +++++++++++++++++++ .../remote/filecache/FileCacheTests.java | 22 +++++++- .../utils/cache/RefCountedCacheTestCase.java | 15 ++++++ 16 files changed, 209 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 09273f0182d8f..798be4678e550 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466)) - SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) - Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550)) +- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/IndicesRequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/IndicesRequestConverters.java index ca9154340a660..bffc5b0a21f1b 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/IndicesRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/IndicesRequestConverters.java @@ -261,6 +261,7 @@ static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) { parameters.withIndicesOptions(clearIndicesCacheRequest.indicesOptions()); parameters.putParam("query", Boolean.toString(clearIndicesCacheRequest.queryCache())); parameters.putParam("fielddata", Boolean.toString(clearIndicesCacheRequest.fieldDataCache())); + parameters.putParam("file", Boolean.toString(clearIndicesCacheRequest.fileCache())); parameters.putParam("request", Boolean.toString(clearIndicesCacheRequest.requestCache())); parameters.putParam("fields", String.join(",", clearIndicesCacheRequest.fields())); request.addParameters(parameters.asMap()); diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java index 7ed06129dc893..2710e9531bf1b 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java @@ -596,6 +596,10 @@ public void testClearCache() { clearIndicesCacheRequest.fields(RequestConvertersTests.randomIndicesNames(1, 5)); expectedParams.put("fields", String.join(",", clearIndicesCacheRequest.fields())); } + if (OpenSearchTestCase.randomBoolean()) { + clearIndicesCacheRequest.fileCache(OpenSearchTestCase.randomBoolean()); + } + expectedParams.put("file", Boolean.toString(clearIndicesCacheRequest.fileCache())); Request request = IndicesRequestConverters.clearCache(clearIndicesCacheRequest); StringJoiner endpoint = new StringJoiner("/", "/", ""); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clear_cache.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clear_cache.json index 64c10a520c7c4..0c7eca8c8e6f5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clear_cache.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clear_cache.json @@ -67,6 +67,10 @@ "request":{ "type":"boolean", "description":"Clear request cache" + }, + "file":{ + "type":"boolean", + "description":"Clear filecache" } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java index 305b39ac3ad0c..31c1aca53ae4a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java @@ -36,6 +36,7 @@ import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import java.util.Arrays; +import java.util.Collections; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; @@ -89,4 +90,42 @@ public void testClearIndicesCacheWithBlocks() { } } } + + public void testClearIndicesFileCacheWithBlocks() { + createIndex("test"); + ensureGreen("test"); + + NumShards numShards = getNumShards("test"); + + // Request is not blocked + for (String blockSetting : Arrays.asList( + SETTING_BLOCKS_READ, + SETTING_BLOCKS_WRITE, + SETTING_READ_ONLY, + SETTING_READ_ONLY_ALLOW_DELETE + )) { + try { + enableIndexBlock("test", blockSetting); + ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin() + .indices() + .prepareClearCache("test") + .setFileCache(true) + .execute() + .actionGet(); + assertNoFailures(clearIndicesCacheResponse); + assertThat(clearIndicesCacheResponse.getSuccessfulShards(), equalTo(numShards.totalNumShards)); + } finally { + disableIndexBlock("test", blockSetting); + } + } + + for (String blockSetting : Collections.singletonList(SETTING_BLOCKS_METADATA)) { + try { + enableIndexBlock("test", blockSetting); + assertBlocked(client().admin().indices().prepareClearCache("test").setQueryCache(true).setFileCache(true)); + } finally { + disableIndexBlock("test", blockSetting); + } + } + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java index 35913c2579aa9..793d02d6c4d13 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.indices.cache.clear; +import org.opensearch.Version; import org.opensearch.action.support.broadcast.BroadcastRequest; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; @@ -49,6 +50,7 @@ public class ClearIndicesCacheRequest extends BroadcastRequest pathStartsWithShardPathPredicate = path -> path.startsWith(shardPath.getDataPath()); + node.fileCache().prune(pathStartsWithShardPathPredicate); + } + } + indicesService.clearIndexShardCache( shardRouting.shardId(), request.queryCache(), diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 09ba6d5ba2105..0aa3740fb6ecb 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.util.List; import java.util.function.BiFunction; +import java.util.function.Predicate; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; @@ -121,6 +122,11 @@ public long prune() { return theCache.prune(); } + @Override + public long prune(Predicate keyPredicate) { + return theCache.prune(keyPredicate); + } + @Override public CacheUsage usage() { return theCache.usage(); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index f36055e5d7327..03d03711f914a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; +import java.util.function.Predicate; /** * LRU implementation of {@link RefCountedCache}. As long as {@link Node#refCount} greater than 0 then node is not eligible for eviction. @@ -256,13 +257,16 @@ public void decRef(K key) { } @Override - public long prune() { + public long prune(Predicate keyPredicate) { long sum = 0L; lock.lock(); try { final Iterator> iterator = lru.values().iterator(); while (iterator.hasNext()) { final Node node = iterator.next(); + if (keyPredicate != null && !keyPredicate.test(node.key)) { + continue; + } iterator.remove(); data.remove(node.key, node); sum += node.weight; diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java index bbb37dc57ae7e..e6b5a5f945d83 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java @@ -11,6 +11,7 @@ import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; import java.util.function.BiFunction; +import java.util.function.Predicate; /** * Custom Cache which support typical cache operations (put, get, ...) and it support reference counting per individual key which might @@ -80,7 +81,17 @@ public interface RefCountedCache { * * @return The total weight of all removed entries. */ - long prune(); + default long prune() { + return prune(key -> true); + } + + /** + * Removes the cache entries which match the predicate criteria for the key + * and have a reference count of zero, regardless of current capacity. + * + * @return The total weight of all removed entries. + */ + long prune(Predicate keyPredicate); /** * Returns the weighted usage of this cache. diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java index 42e44aa5f6a15..d3eb03df37e1b 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java @@ -15,6 +15,7 @@ import java.util.Objects; import java.util.function.BiFunction; +import java.util.function.Predicate; /** * Segmented {@link LRUCache} to offer concurrent access with less contention. @@ -138,6 +139,15 @@ public long prune() { return sum; } + @Override + public long prune(Predicate keyPredicate) { + long sum = 0L; + for (RefCountedCache cache : table) { + sum += cache.prune(keyPredicate); + } + return sum; + } + @Override public CacheUsage usage() { long usage = 0L; diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestClearIndicesCacheAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestClearIndicesCacheAction.java index e4f0d7d006d87..ee1d5a97d00c0 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestClearIndicesCacheAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestClearIndicesCacheAction.java @@ -83,6 +83,7 @@ public static ClearIndicesCacheRequest fromRequest(final RestRequest request, Cl clearIndicesCacheRequest.queryCache(request.paramAsBoolean("query", clearIndicesCacheRequest.queryCache())); clearIndicesCacheRequest.requestCache(request.paramAsBoolean("request", clearIndicesCacheRequest.requestCache())); clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("fielddata", clearIndicesCacheRequest.fieldDataCache())); + clearIndicesCacheRequest.fileCache(request.paramAsBoolean("file", clearIndicesCacheRequest.fileCache())); clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields())); return clearIndicesCacheRequest; } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java index cce0927d4994f..85aa60d6f308b 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java @@ -9,26 +9,45 @@ package org.opensearch.action.admin.indices.cache.clear; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlock; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.breaker.NoopCircuitBreaker; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; +import org.opensearch.index.store.remote.filecache.FileCacheTests; import org.opensearch.indices.IndicesService; +import org.opensearch.node.Node; import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.TransportService; +import java.io.IOException; +import java.nio.file.Path; import java.util.EnumSet; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase { + + private final Node testNode = mock(Node.class); private final TransportClearIndicesCacheAction action = new TransportClearIndicesCacheAction( mock(ClusterService.class), mock(TransportService.class), mock(IndicesService.class), + testNode, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class) ); @@ -55,6 +74,40 @@ public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase { EnumSet.of(ClusterBlockLevel.METADATA_READ) ); + public void testOnShardOperation() throws IOException { + final String indexName = "test"; + final Settings settings = buildEnvSettings(Settings.EMPTY); + final Environment environment = TestEnvironment.newEnvironment(settings); + try (final NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment)) { + // Initialize necessary stubs for the filecache clear shard operation + final ShardId shardId = new ShardId(indexName, indexName, 1); + final ShardRouting shardRouting = mock(ShardRouting.class); + when(shardRouting.shardId()).thenReturn(shardId); + final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId); + final Path cacheEntryPath = shardPath.getDataPath(); + final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 1024, 16, new NoopCircuitBreaker("")); + + when(testNode.fileCache()).thenReturn(fileCache); + when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment); + + // Add an entry into the filecache and reduce the ref count + fileCache.put(cacheEntryPath, new FileCacheTests.StubCachedIndexInput(1)); + fileCache.decRef(cacheEntryPath); + + // Check if the entry exists and reduce the ref count to make it evictable + assertNotNull(fileCache.get(cacheEntryPath)); + fileCache.decRef(cacheEntryPath); + + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(); + clearIndicesCacheRequest.fileCache(true); + assertEquals( + TransportBroadcastByNodeAction.EmptyResult.INSTANCE, + action.shardOperation(clearIndicesCacheRequest, shardRouting) + ); + assertNull(fileCache.get(cacheEntryPath)); + } + } + public void testGlobalBlockCheck() { ClusterBlocks.Builder builder = ClusterBlocks.builder(); builder.addGlobalBlock(writeClusterBlock); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 0cc0cc8d31ade..43a5c04b59f83 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -225,6 +225,24 @@ public void testPrune() { assertEquals(fileCache.size(), 0); } + public void testPruneWithPredicate() { + FileCache fileCache = createFileCache(GIGA_BYTES); + for (int i = 0; i < 4; i++) { + putAndDecRef(fileCache, i, 8 * MEGA_BYTES); + } + + // before prune + assertEquals(fileCache.size(), 4); + + // after prune with false predicate + fileCache.prune(path -> false); + assertEquals(fileCache.size(), 4); + + // after prune with true predicate + fileCache.prune(path -> true); + assertEquals(fileCache.size(), 0); + } + public void testUsage() { FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( 16 * MEGA_BYTES, @@ -280,11 +298,11 @@ private void putAndDecRef(FileCache cache, int path, long indexInputSize) { cache.decRef(key); } - private static class StubCachedIndexInput implements CachedIndexInput { + public static class StubCachedIndexInput implements CachedIndexInput { private final long length; - private StubCachedIndexInput(long length) { + public StubCachedIndexInput(long length) { this.length = length; } diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java index 59657eebc4480..b11740b53e11f 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java @@ -138,6 +138,21 @@ public void testPrune() { assertEquals(10L, (long) refCountedCache.get("3")); } + public void testPruneWithPredicate() { + refCountedCache.put("1", 10L); + refCountedCache.decRef("1"); + refCountedCache.put("2", 10L); + refCountedCache.decRef("2"); + refCountedCache.put("3", 10L); + + assertEquals(0L, refCountedCache.prune(path -> false)); + + assertEquals(20L, refCountedCache.prune(path -> true)); + assertNull(refCountedCache.get("1")); + assertNull(refCountedCache.get("2")); + assertEquals(10L, (long) refCountedCache.get("3")); + } + public void testStats() { assertEquals(0, refCountedCache.stats().hitCount()); refCountedCache.put("1", 1L);