diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 09d5c208a8756..108ef14f0fcb4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,6 +34,12 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -56,7 +62,10 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.search.aggregations.bucket.histogram.Histogram; @@ -65,6 +74,7 @@ import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; @@ -768,6 +778,59 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { assertTrue(stats.getMemorySizeInBytes() == 0); } + public void testTimedOutQuery() throws Exception { + // A timed out query should be cached and then invalidated + Client client = client(); + String index = "index"; + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + ) + .get() + ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); + + QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") { + @Override + protected Query doToQuery(QueryShardContext context) { + return new TermQuery(new Term("k", "hello")) { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + // Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will + // sometimes throw an exception on timeout, rather than timing out gracefully. + Weight result = super.createWeight(searcher, scoreMode, boost); + try { + Thread.sleep(500); + } catch (InterruptedException ignored) {} + return result; + } + }; + } + }; + + SearchResponse resp = client.prepareSearch(index) + .setRequestCache(true) + .setQuery(timeoutQueryBuilder) + .setTimeout(TimeValue.ZERO) + .get(); + assertTrue(resp.isTimedOut()); + RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); + // The cache should be empty as the timed-out query was invalidated + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + } + private Path[] shardDirectory(String server, Index index, int shard) { NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); final Path[] paths = env.availableShardPaths(new ShardId(index, shard)); diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index c6bf4fb381ce9..416cd73abbd25 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -311,12 +311,11 @@ BytesReference getOrCompute( * @param cacheKey the cache key to invalidate */ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { - assert reader.getReaderCacheHelper() != null; - String readerCacheKeyId = null; - if (reader instanceof OpenSearchDirectoryReader) { - IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); - } + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity(); cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard)))); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 09a7b322a5a62..af8d6cc6b6280 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -70,6 +70,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -1820,8 +1821,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (context.getQueryShardContext().isCacheable() == false) { return false; } - return true; - + return context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof DelegatingCacheHelper; } /** diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index 1601f137954e2..de9c75869b7df 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -31,12 +31,15 @@ package org.opensearch.indices; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.Version; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; +import org.opensearch.action.search.SearchType; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexGraveyard; @@ -44,6 +47,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -77,8 +81,11 @@ import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.test.TestSearchContext; import org.opensearch.test.VersionUtils; import org.opensearch.test.hamcrest.RegexMatcher; @@ -636,4 +643,32 @@ public void testClusterRemoteTranslogBufferIntervalDefault() { indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval() ); } + + public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws IOException { + IndicesService indicesService = getIndicesService(); + final IndexService indexService = createIndex("test"); + ShardSearchRequest request = mock(ShardSearchRequest.class); + when(request.requestCache()).thenReturn(true); + + TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) { + @Override + public SearchType searchType() { + return SearchType.QUERY_THEN_FETCH; + } + }; + + ContextIndexSearcher searcher = mock(ContextIndexSearcher.class); + context.setSearcher(searcher); + DirectoryReader reader = mock(DirectoryReader.class); + when(searcher.getDirectoryReader()).thenReturn(reader); + when(searcher.getIndexReader()).thenReturn(reader); + IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class); + DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class); + + for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) { + IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper; + when(reader.getReaderCacheHelper()).thenReturn(cacheHelper); + assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context)); + } + } }