From 33b408ffc931399255b822126746edbd80d14ae7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 9 Nov 2020 19:38:47 -0500 Subject: [PATCH] Realtime get from in-memory segment when possible (#64504) If the reader wrapper is specified, then we can't perform a realtime get using operations from translog. With this change, we will create an in-memory Lucene segment from that indexing operation and perform a realtime get from that segment to avoid refresh storms. --- .../org/elasticsearch/get/GetActionIT.java | 15 +- .../elasticsearch/index/engine/Engine.java | 13 +- .../index/engine/InternalEngine.java | 44 ++- .../index/engine/ReadOnlyEngine.java | 6 +- .../engine/SingleDocDirectoryReader.java | 278 ++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/engine/InternalEngineTests.java | 116 +++++--- .../index/engine/ReadOnlyEngineTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/shard/ShardGetServiceTests.java | 3 +- .../index/engine/EngineTestCase.java | 92 ++++++ .../index/shard/IndexShardTestCase.java | 3 + .../index/shard/SearcherHelper.java | 39 +++ 13 files changed, 555 insertions(+), 62 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/SingleDocDirectoryReader.java create mode 100644 test/framework/src/main/java/org/elasticsearch/index/shard/SearcherHelper.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java index 9f324d1a5c8b5..11cb5aa77a47c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java @@ -37,6 +37,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; @@ -47,6 +49,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import static java.util.Collections.singleton; @@ -65,7 +68,17 @@ public class GetActionIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Collections.singleton(InternalSettingsPlugin.class); + return List.of(InternalSettingsPlugin.class, SearcherWrapperPlugin.class); + } + + public static class SearcherWrapperPlugin extends Plugin { + @Override + public void onIndexModule(IndexModule indexModule) { + super.onIndexModule(indexModule); + if (randomBoolean()) { + indexModule.setReaderWrapper(indexService -> EngineTestCase.randomReaderWrapper()); + } + } } public void testSimpleGet() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3079a05abc330..4042203a4fa05 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapping; @@ -96,7 +97,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; @@ -559,9 +559,7 @@ public static class NoOpResult extends Result { } - protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory, - SearcherScope scope) throws EngineException { - final Engine.Searcher searcher = searcherFactory.apply("get", scope); + protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException { final DocIdAndVersion docIdAndVersion; try { docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true); @@ -596,7 +594,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory) throws EngineException; + public abstract GetResult get(Get get, DocumentMapper mapper, Function searcherWrapper); /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. @@ -1616,6 +1614,7 @@ private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, this.docIdAndVersion = docIdAndVersion; this.searcher = searcher; this.fromTranslog = fromTranslog; + assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader; } public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) { @@ -1630,6 +1629,10 @@ public long version() { return this.version; } + /** + * Returns {@code true} iff the get was performed from a translog operation. Notes that this returns {@code false} + * if the get was performed on an in-memory Lucene segment created from the corresponding translog operation. + */ public boolean isFromTranslog() { return fromTranslog; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ed76f294ca54e..0daf3d1183f91 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -73,6 +73,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.ParseContext; @@ -618,14 +619,34 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } } + private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper mapper, + Function searcherWrapper) throws IOException { + assert get.isReadFromTranslog(); + final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mapper, config().getAnalyzer()); + final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId), + config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader); + final Searcher wrappedSearcher = searcherWrapper.apply(searcher); + if (wrappedSearcher == searcher) { + searcher.close(); + assert inMemoryReader.assertMemorySegmentStatus(false); + final TranslogLeafReader translogLeafReader = new TranslogLeafReader(index); + return new GetResult(new Engine.Searcher("realtime_get", translogLeafReader, + IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), translogLeafReader), + new VersionsAndSeqNoResolver.DocIdAndVersion( + 0, index.version(), index.seqNo(), index.primaryTerm(), translogLeafReader, 0), true); + } else { + assert inMemoryReader.assertMemorySegmentStatus(true); + return getFromSearcher(get, wrappedSearcher); + } + } + @Override - public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { + public GetResult get(Get get, DocumentMapper mapper, Function searcherWrapper) { assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - SearcherScope scope; if (get.realtime()) { - VersionValue versionValue = null; + final VersionValue versionValue; try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { // we need to lock here to access the version map to do this truly in RT versionValue = getVersionFromMap(get.uid().bytes()); @@ -649,15 +670,9 @@ public GetResult get(Get get, BiFunction // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 if (versionValue.getLocation() != null) { try { - Translog.Operation operation = translog.readOperation(versionValue.getLocation()); + final Translog.Operation operation = translog.readOperation(versionValue.getLocation()); if (operation != null) { - // in the case of a already pruned translog generation we might get null here - yet very unlikely - final Translog.Index index = (Translog.Index) operation; - TranslogLeafReader reader = new TranslogLeafReader(index); - return new GetResult(new Engine.Searcher("realtime_get", reader, - IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader), - new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(), - reader, 0), true); + return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper); } } catch (IOException e) { maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event @@ -670,14 +685,11 @@ public GetResult get(Get get, BiFunction assert versionValue.seqNo >= 0 : versionValue; refreshIfNeeded("realtime_get", versionValue.seqNo); } - scope = SearcherScope.INTERNAL; + return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper)); } else { // we expose what has been externally expose in a point in time snapshot via an explicit refresh - scope = SearcherScope.EXTERNAL; + return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper)); } - - // no version, get the version from the index, we know that we refresh on flush - return getFromSearcher(get, searcherFactory, scope); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 4bc697e018ee4..c9d9baaef0d26 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -48,7 +49,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; @@ -219,8 +219,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm } @Override - public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { - return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); + public GetResult get(Get get, DocumentMapper mapper, Function searcherWrapper) { + return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper)); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/SingleDocDirectoryReader.java b/server/src/main/java/org/elasticsearch/index/engine/SingleDocDirectoryReader.java new file mode 100644 index 0000000000000..92dfbb826e088 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SingleDocDirectoryReader.java @@ -0,0 +1,278 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link DirectoryReader} contains a single leaf reader delegating to an in-memory Lucene segment that is lazily created from + * a single document. + */ +final class SingleDocDirectoryReader extends DirectoryReader { + private final SingleDocLeafReader leafReader; + + SingleDocDirectoryReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) throws IOException { + this(new SingleDocLeafReader(shardId, operation, mapper, analyzer)); + } + + private SingleDocDirectoryReader(SingleDocLeafReader leafReader) throws IOException { + super(leafReader.directory, new LeafReader[]{leafReader}); + this.leafReader = leafReader; + } + + boolean assertMemorySegmentStatus(boolean loaded) { + return leafReader.assertMemorySegmentStatus(loaded); + } + + private static UnsupportedOperationException unsupported() { + assert false : "unsupported operation"; + return new UnsupportedOperationException(); + } + + @Override + protected DirectoryReader doOpenIfChanged() { + throw unsupported(); + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) { + throw unsupported(); + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) { + throw unsupported(); + } + + @Override + public long getVersion() { + throw unsupported(); + } + + @Override + public boolean isCurrent() { + throw unsupported(); + } + + @Override + public IndexCommit getIndexCommit() { + throw unsupported(); + } + + @Override + protected void doClose() throws IOException { + leafReader.close(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return leafReader.getReaderCacheHelper(); + } + + private static class SingleDocLeafReader extends LeafReader { + + private final ShardId shardId; + private final Translog.Index operation; + private final DocumentMapper mapper; + private final Analyzer analyzer; + private final Directory directory; + private final AtomicReference delegate = new AtomicReference<>(); + + SingleDocLeafReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) { + this.shardId = shardId; + this.operation = operation; + this.mapper = mapper; + this.analyzer = analyzer; + this.directory = new ByteBuffersDirectory(); + } + + private LeafReader getDelegate() { + ensureOpen(); + LeafReader reader = delegate.get(); + if (reader == null) { + synchronized (this) { + reader = delegate.get(); + if (reader == null) { + reader = createInMemoryLeafReader(); + final LeafReader existing = delegate.getAndSet(reader); + assert existing == null; + } + } + } + return reader; + } + + private LeafReader createInMemoryLeafReader() { + assert Thread.holdsLock(this); + final ParsedDocument parsedDocs = mapper.parse(new SourceToParse(shardId.getIndexName(), operation.id(), + operation.source(), XContentHelper.xContentType(operation.source()), operation.routing())); + parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm()); + parsedDocs.version().setLongValue(operation.version()); + final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter writer = new IndexWriter(directory, writeConfig)) { + writer.addDocument(parsedDocs.rootDoc()); + final DirectoryReader reader = open(writer); + if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) { + reader.close(); + throw new IllegalStateException("Expected a single document segment; " + + "but [" + reader.leaves().size() + " segments with " + reader.leaves().get(0).reader().numDocs() + " documents"); + } + return reader.leaves().get(0).reader(); + } catch (IOException e) { + throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e); + } + } + + @Override + public CacheHelper getCoreCacheHelper() { + return getDelegate().getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return getDelegate().getReaderCacheHelper(); + } + + @Override + public Terms terms(String field) throws IOException { + return getDelegate().terms(field); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + return getDelegate().getNumericDocValues(field); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) throws IOException { + return getDelegate().getBinaryDocValues(field); + } + + @Override + public SortedDocValues getSortedDocValues(String field) throws IOException { + return getDelegate().getSortedDocValues(field); + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { + return getDelegate().getSortedNumericDocValues(field); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { + return getDelegate().getSortedSetDocValues(field); + } + + @Override + public NumericDocValues getNormValues(String field) throws IOException { + return getDelegate().getNormValues(field); + } + + @Override + public FieldInfos getFieldInfos() { + return getDelegate().getFieldInfos(); + } + + @Override + public Bits getLiveDocs() { + return getDelegate().getLiveDocs(); + } + + @Override + public PointValues getPointValues(String field) throws IOException { + return getDelegate().getPointValues(field); + } + + @Override + public void checkIntegrity() throws IOException { + } + + @Override + public LeafMetaData getMetaData() { + return getDelegate().getMetaData(); + } + + @Override + public Fields getTermVectors(int docID) throws IOException { + return getDelegate().getTermVectors(docID); + } + + @Override + public int numDocs() { + return 1; + } + + @Override + public int maxDoc() { + return 1; + } + + synchronized boolean assertMemorySegmentStatus(boolean loaded) { + if (loaded) { + assert delegate.get() != null : + "Expected an in memory segment was loaded; but it wasn't. Please check the reader wrapper implementation"; + } else { + assert delegate.get() == null : + "Expected an in memory segment wasn't loaded; but it was. Please check the reader wrapper implementation"; + } + return true; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + assert assertMemorySegmentStatus(true); + getDelegate().document(docID, visitor); + } + + @Override + protected void doClose() throws IOException { + IOUtils.close(delegate.get(), directory); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d2e469012cca3..498c15f243868 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -923,7 +923,7 @@ public Engine.GetResult get(Engine.Get get) { if (mapper == null) { return GetResult.NOT_EXISTS; } - return getEngine().get(get, this::acquireSearcher); + return getEngine().get(get, mapper, this::wrapSearcher); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7f18327f929f2..129bde0b45d76 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -55,6 +55,7 @@ import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortedSetSortField; @@ -108,6 +109,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; @@ -123,6 +125,7 @@ import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.SearcherHelper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.store.Store; @@ -695,8 +698,7 @@ public void testConcurrentGetAndFlush() throws Exception { engine.index(indexForDoc(doc)); final AtomicReference latestGetResult = new AtomicReference<>(); - final BiFunction searcherFactory = engine::acquireSearcher; - latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); + latestGetResult.set(engine.get(newGet(true, doc), docMapper(), randomSearcherWrapper())); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); Thread getThread = new Thread(() -> { @@ -710,7 +712,7 @@ public void testConcurrentGetAndFlush() throws Exception { if (previousGetResult != null) { previousGetResult.close(); } - latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); + latestGetResult.set(engine.get(newGet(true, doc), docMapper(), randomSearcherWrapper())); if (latestGetResult.get().exists() == false) { break; } @@ -726,6 +728,7 @@ public void testConcurrentGetAndFlush() throws Exception { } public void testSimpleOperations() throws Exception { + final DocumentMapper mapper = docMapper(); engine.refresh("warm_up"); Engine.Searcher searchResult = engine.acquireSearcher("test"); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); @@ -747,18 +750,18 @@ public void testSimpleOperations() throws Exception { searchResult.close(); // but, not there non realtime - try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(false, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(false)); } // but, we can still get it (in realtime) - try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); } // but not real time is not yet visible - try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(false, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(false)); } @@ -773,7 +776,7 @@ public void testSimpleOperations() throws Exception { searchResult.close(); // also in non realtime - try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(false, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); } @@ -781,8 +784,8 @@ public void testSimpleOperations() throws Exception { // now do an update document = testDocument(); document.add(new TextField("value", "test1", Field.Store.YES)); - document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_2), SourceFieldMapper.Defaults.FIELD_TYPE)); - doc = testParsedDocument("1", null, document, B_2, null); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(SOURCE), SourceFieldMapper.Defaults.FIELD_TYPE)); + doc = testParsedDocument("1", null, document, SOURCE, null); engine.index(indexForDoc(doc)); // its not updated yet... @@ -795,7 +798,7 @@ public void testSimpleOperations() throws Exception { searchResult.close(); // but, we can still get it (in realtime) - try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); } @@ -824,7 +827,7 @@ public void testSimpleOperations() throws Exception { searchResult.close(); // but, get should not see it (in realtime) - try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(false)); } @@ -870,7 +873,7 @@ public void testSimpleOperations() throws Exception { engine.flush(); // and, verify get (in real time) - try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + try (Engine.GetResult getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); } @@ -903,6 +906,53 @@ public void testSimpleOperations() throws Exception { searchResult.close(); } + public void testGetWithSearcherWrapper() throws Exception { + engine.refresh("warm_up"); + engine.index(indexForDoc(createParsedDoc("1", null))); + assertThat(engine.lastRefreshedCheckpoint(), equalTo(NO_OPS_PERFORMED)); + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), randomSearcherWrapper())) { + // we do not track the translog location yet + assertTrue(get.exists()); + assertFalse(get.isFromTranslog()); + } + // refresh triggered, as we did not track translog location until the first realtime get. + assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L)); + + engine.index(indexForDoc(createParsedDoc("1", null))); + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), searcher -> searcher)) { + assertTrue(get.exists()); + assertTrue(get.isFromTranslog()); + } + assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L)); // no refresh; just read from translog + if (randomBoolean()) { + engine.index(indexForDoc(createParsedDoc("1", null))); + } + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), + searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new MatchAllDocsQuery())))) { + assertTrue(get.exists()); + assertFalse(get.isFromTranslog()); + } + + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), + searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new MatchNoDocsQuery())))) { + assertFalse(get.exists()); + assertFalse(get.isFromTranslog()); + } + + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), + searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new TermQuery(newUid("1")))))) { + assertTrue(get.exists()); + assertFalse(get.isFromTranslog()); + } + + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), docMapper(), + searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new TermQuery(newUid("2")))))) { + assertFalse(get.exists()); + assertFalse(get.isFromTranslog()); + } + assertThat("no refresh, just read from translog or in-memory segment", engine.lastRefreshedCheckpoint(), equalTo(0L)); + } + public void testSearchResultRelease() throws Exception { engine.refresh("warm_up"); Engine.Searcher searchResult = engine.acquireSearcher("test"); @@ -1156,7 +1206,7 @@ public void testVersionedUpdate() throws IOException { Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), randomSearcherWrapper())) { assertEquals(1, get.version()); } @@ -1164,7 +1214,7 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_1_result = engine.index(update_1); assertThat(update_1_result.getVersion(), equalTo(2L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), randomSearcherWrapper())) { assertEquals(2, get.version()); } @@ -1172,14 +1222,13 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_2_result = engine.index(update_2); assertThat(update_2_result.getVersion(), equalTo(3L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), randomSearcherWrapper())) { assertEquals(3, get.version()); } } public void testGetIfSeqNoIfPrimaryTerm() throws IOException { - final BiFunction searcherFactory = engine::acquireSearcher; ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); @@ -1193,22 +1242,22 @@ public void testGetIfSeqNoIfPrimaryTerm() throws IOException { try (Engine.GetResult get = engine.get( new Engine.Get(true, true, doc.id()) .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get()), - searcherFactory)) { + docMapper(), randomSearcherWrapper())) { assertEquals(indexResult.getSeqNo(), get.docIdAndVersion().seqNo); } expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.id()) .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get()), - searcherFactory)); + docMapper(), randomSearcherWrapper())); expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.id()) .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get() + 1), - searcherFactory)); + docMapper(), randomSearcherWrapper())); final VersionConflictEngineException versionConflictEngineException = expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.id()) .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get() + 1), - searcherFactory)); + docMapper(), randomSearcherWrapper())); assertThat(versionConflictEngineException.getStackTrace(), emptyArray()); } @@ -1954,7 +2003,6 @@ class OpAndVersion { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); - final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -1964,7 +2012,7 @@ class OpAndVersion { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), randomSearcherWrapper())) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); @@ -2006,7 +2054,7 @@ class OpAndVersion { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), randomSearcherWrapper())) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); @@ -2402,7 +2450,7 @@ public void testEnableGcDeletes() throws Exception { Engine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); - final BiFunction searcherFactory = engine::acquireSearcher; + final DocumentMapper mapper = docMapper(); // Add document Document document = testDocument(); @@ -2418,7 +2466,7 @@ public void testEnableGcDeletes() throws Exception { 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0)); // Get should not find the document - Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory); + Engine.GetResult getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper()); assertThat(getResult.exists(), equalTo(false)); // Give the gc pruning logic a chance to kick in @@ -2433,7 +2481,7 @@ public void testEnableGcDeletes() throws Exception { 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0)); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, false, "2"), searcherFactory); + getResult = engine.get(new Engine.Get(true, false, "2"), mapper, randomSearcherWrapper()); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -2444,7 +2492,7 @@ public void testEnableGcDeletes() throws Exception { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should still not find the document - getResult = engine.get(newGet(true, doc), searcherFactory); + getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper()); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=2 with a too-old version, should fail: @@ -2455,7 +2503,7 @@ public void testEnableGcDeletes() throws Exception { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should not find the document - getResult = engine.get(newGet(true, doc), searcherFactory); + getResult = engine.get(newGet(true, doc), mapper, randomSearcherWrapper()); assertThat(getResult.exists(), equalTo(false)); } } @@ -3968,7 +4016,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio } assertThat(engine.getProcessedLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "1"), searcherFactory)) { + try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "1"), docMapper(), randomSearcherWrapper())) { assertThat(result.exists(), equalTo(exists)); } } @@ -4851,7 +4899,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { } public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { - final int iters = randomIntBetween(1, 15); + final int iters = randomIntBetween(1, 1); for (int i = 0; i < iters; i++) { // this is a reproduction of https://github.com/elastic/elasticsearch/issues/28714 try (Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir())) { @@ -4893,13 +4941,15 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup CountDownLatch awaitStarted = new CountDownLatch(1); Thread thread = new Thread(() -> { awaitStarted.countDown(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc3.id()), engine::acquireSearcher)) { + try (Engine.GetResult getResult = engine.get( + new Engine.Get(true, false, doc3.id()), docMapper(), searcher -> searcher)) { assertTrue(getResult.exists()); } }); thread.start(); awaitStarted.await(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc.id()), engine::acquireSearcher)) { + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc.id()), docMapper(), + searcher -> SearcherHelper.wrapSearcher(searcher, r -> new MatchingDirectoryReader(r, new MatchAllDocsQuery())))) { assertFalse(getResult.exists()); } thread.join(); @@ -5848,7 +5898,7 @@ public void afterRefresh(boolean didRefresh) { int iters = randomIntBetween(1, 10); for (int i = 0; i < iters; i++) { ParsedDocument doc = createParsedDoc(randomFrom(ids), null); - try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) { + try (Engine.GetResult getResult = engine.get(newGet(true, doc), docMapper(), randomSearcherWrapper())) { assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 563bd0acc2751..a9731539fc446 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -100,7 +100,7 @@ public void testReadOnlyEngine() throws Exception { assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); - try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) { + try (Engine.GetResult getResult = readOnlyEngine.get(get, docMapper(), randomSearcherWrapper())) { assertTrue(getResult.exists()); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index b6e72863279c0..ce108017d6028 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; @@ -344,7 +345,8 @@ public void testLotsOfThreads() throws Exception { listener.assertNoError(); Engine.Get get = new Engine.Get(false, false, threadId); - try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) { + final DocumentMapper mapper = EngineTestCase.docMapper(); + try (Engine.GetResult getResult = engine.get(get, mapper, EngineTestCase.randomSearcherWrapper())) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); org.apache.lucene.document.Document document = diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index d3bde9d174b1a..44a316242b98f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.RoutingFieldMapper; @@ -117,7 +118,7 @@ private void runGetFromTranslogWithOptions(String docToIndex, String sourceOptio "\"bar\": { \"type\": " + fieldType + "}}, \"_source\": { " + sourceOptions + "}}}") .settings(settings) .primaryTerm(0, 1).build(); - IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, EngineTestCase.randomReaderWrapper()); recoverShardFromStore(primary); Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex); assertTrue(primary.getEngine().refreshNeeded()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index aa9c0c42c1eb2..ca2f9a36b0044 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -28,6 +28,8 @@ import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -40,14 +42,19 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -55,6 +62,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -94,6 +102,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.SearcherHelper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -110,6 +119,7 @@ import org.junit.Before; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; @@ -1184,6 +1194,14 @@ public static MapperService createMapperService() throws IOException { return mapperService; } + public static DocumentMapper docMapper() { + try { + return createMapperService().documentMapper(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** * Exposes a translog associated with the given engine for testing purpose. */ @@ -1259,4 +1277,78 @@ public static long getInFlightDocCount(Engine engine) { public static void assertNoInFlightDocuments(Engine engine) throws Exception { assertBusy(() -> assertThat(getInFlightDocCount(engine), equalTo(0L))); } + + public static final class MatchingDirectoryReader extends FilterDirectoryReader { + private final Query query; + + public MatchingDirectoryReader(DirectoryReader in, Query query) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader leaf) { + try { + final IndexSearcher searcher = new IndexSearcher(leaf); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + final Scorer scorer = weight.scorer(leaf.getContext()); + final DocIdSetIterator iterator = scorer != null ? scorer.iterator() : null; + final FixedBitSet liveDocs = new FixedBitSet(leaf.maxDoc()); + if (iterator != null) { + for (int docId = iterator.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = iterator.nextDoc()) { + if (leaf.getLiveDocs() == null || leaf.getLiveDocs().get(docId)) { + liveDocs.set(docId); + } + } + } + return new FilterLeafReader(leaf) { + @Override + public Bits getLiveDocs() { + return liveDocs; + } + + @Override + public CacheHelper getCoreCacheHelper() { + return leaf.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; // modify liveDocs + } + }; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }); + this.query = query; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new MatchingDirectoryReader(in, query); + } + + @Override + public CacheHelper getReaderCacheHelper() { + // TODO: We should not return the ReaderCacheHelper if we modify the liveDocs, + // but some caching components (e.g., global ordinals) require this cache key. + return in.getReaderCacheHelper(); + } + } + + public static CheckedFunction randomReaderWrapper() { + if (randomBoolean()) { + return reader -> reader; + } else { + return reader -> new MatchingDirectoryReader(reader, new MatchAllDocsQuery()); + } + } + + public static Function randomSearcherWrapper() { + if (randomBoolean()) { + return Function.identity(); + } else { + final CheckedFunction readerWrapper = randomReaderWrapper(); + return searcher -> SearcherHelper.wrapSearcher(searcher, readerWrapper); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index c9fe7ac9a3199..d677891743cfa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -367,6 +367,9 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe storeProvider = is -> createStore(is, shardPath); } final Store store = storeProvider.apply(indexSettings); + if (indexReaderWrapper == null && randomBoolean()) { + indexReaderWrapper = EngineTestCase.randomReaderWrapper(); + } boolean success = false; try { IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/SearcherHelper.java b/test/framework/src/main/java/org/elasticsearch/index/shard/SearcherHelper.java new file mode 100644 index 0000000000000..dd5a769c8d999 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/SearcherHelper.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.DirectoryReader; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.index.engine.Engine; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public class SearcherHelper { + + public static Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher, + CheckedFunction readerWrapper) { + try { + return IndexShard.wrapSearcher(engineSearcher, readerWrapper); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +}