diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 8adfde6cdbba9..9f0b8dbaaa8c2 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -111,7 +111,6 @@ protected void doRun() throws Exception { return future; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27650") public void testRecoveryWithConcurrentIndexing() throws Exception { final String index = "recovery_with_concurrent_indexing"; Response response = client().performRequest(new Request("GET", "_nodes")); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java b/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java index a0095613cdb50..4f2ac5e5b21cd 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java @@ -44,6 +44,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; @@ -51,7 +52,7 @@ /** * The indices request cache allows to cache a shard level request stage responses, helping with improving * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent - * with the semantics of NRT (the index reader version is part of the cache key), and relies on size based + * with the semantics of NRT (the index reader cache key is part of the cache key), and relies on size based * eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that * are no longer used or closed shards. *

@@ -100,7 +101,7 @@ public void close() { } void clear(CacheEntity entity) { - keysToClean.add(new CleanupKey(entity, -1)); + keysToClean.add(new CleanupKey(entity, null)); cleanCache(); } @@ -111,13 +112,14 @@ public void onRemoval(RemovalNotification notification) { BytesReference getOrCompute(CacheEntity cacheEntity, Supplier loader, DirectoryReader reader, BytesReference cacheKey) throws Exception { - final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey); + assert reader.getReaderCacheHelper() != null; + final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -137,7 +139,8 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier lo * @param cacheKey the cache key to invalidate */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { - cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey)); + assert reader.getReaderCacheHelper() != null; + cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); } private static class Loader implements CacheLoader { @@ -206,12 +209,12 @@ static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + public final IndexReader.CacheKey readerCacheKey; public final BytesReference value; - Key(CacheEntity entity, long readerVersion, BytesReference value) { + Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { this.entity = entity; - this.readerVersion = readerVersion; + this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; } @@ -231,7 +234,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (readerVersion != key.readerVersion) return false; + if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -240,7 +243,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Long.hashCode(readerVersion); + result = 31 * result + readerCacheKey.hashCode(); result = 31 * result + value.hashCode(); return result; } @@ -248,11 +251,11 @@ public int hashCode() { private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + final IndexReader.CacheKey readerCacheKey; - private CleanupKey(CacheEntity entity, long readerVersion) { + private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { this.entity = entity; - this.readerVersion = readerVersion; + this.readerCacheKey = readerCacheKey; } @Override @@ -270,7 +273,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (readerVersion != that.readerVersion) return false; + if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -278,7 +281,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Long.hashCode(readerVersion); + result = 31 * result + Objects.hashCode(readerCacheKey); return result; } } @@ -293,7 +296,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext(); ) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { // -1 indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -306,7 +309,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { iterator.remove(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 824c1983fcb96..cf20a9e49103d 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -1166,10 +1166,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { } else if (request.requestCache() == false) { return false; } - // if the reader is not a directory reader, we can't get the version from it - if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) { - return false; - } + // We use the cacheKey of the index reader as a part of a key of the IndicesRequestCache. + assert context.searcher().getIndexReader().getReaderCacheHelper() != null; + // if now in millis is used (or in the future, a more generic "isDeterministic" flag // then we can't cache based on "now" key within the search request, as it is not deterministic if (context.getQueryShardContext().isCachable() == false) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index cab6e37eeaf4e..0487d727af2a5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; @@ -67,6 +68,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -713,4 +715,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException { assertTrue(notified.get()); } + public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception { + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("test"), 0)); + final SearchRequest countRequest = new SearchRequest("test").source(new SearchSourceBuilder().size(0)); + final long numDocs = between(10, 20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + if (randomBoolean()) { + shard.refresh("test"); + } + } + shard.refresh("test"); + assertThat(client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs)); + assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); + shard.resetEngineToGlobalCheckpoint(); + final long moreDocs = between(10, 20); + for (int i = 0; i < moreDocs; i++) { + client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get(); + if (randomBoolean()) { + shard.refresh("test"); + } + } + shard.refresh("test"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, (long) searcher.reader().numDocs(), equalTo(numDocs + moreDocs)); + } + assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, + client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs + moreDocs)); + } + } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java index 8059c8a103927..af451de574bc3 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java @@ -23,7 +23,9 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -119,7 +121,11 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, XContentType.JSON, false); - + if (randomBoolean()) { + writer.flush(); + IOUtils.close(writer); + writer = new IndexWriter(dir, newIndexWriterConfig()); + } writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); @@ -424,14 +430,23 @@ public void testInvalidate() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } - public void testEqualsKey() { + public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), 1L, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 2L, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(2)); + Directory dir = newDirectory(); + IndexWriterConfig config = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, config); + IndexReader reader1 = DirectoryReader.open(writer); + IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + writer.addDocument(new Document()); + IndexReader reader2 = DirectoryReader.open(writer); + IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IOUtils.close(reader1, reader2, writer, dir); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2);