diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index f9367be4729e0..6b47e50488c48 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -670,6 +670,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } Releasable releasable = store::decRef; try { + assert assertSearcherIsWarmedUp(source, scope); ReferenceManager referenceManager = getReferenceManager(scope); final ElasticsearchDirectoryReader acquire = referenceManager.acquire(); AtomicBoolean released = new AtomicBoolean(false); @@ -705,6 +706,10 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin protected abstract ReferenceManager getReferenceManager(SearcherScope scope); + boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { + return true; + } + public enum SearcherScope { EXTERNAL, INTERNAL } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8b4bc32dc74bc..76b085bc1d618 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -320,18 +320,13 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { private static final class ExternalReaderManager extends ReferenceManager { private final BiConsumer refreshListener; private final ElasticsearchReaderManager internalReaderManager; + private boolean isWarmedUp; //guarded by refreshLock ExternalReaderManager(ElasticsearchReaderManager internalReaderManager, BiConsumer refreshListener) throws IOException { this.refreshListener = refreshListener; this.internalReaderManager = internalReaderManager; - ElasticsearchDirectoryReader acquire = internalReaderManager.acquire(); - try { - incrementAndNotify(acquire, null); - current = acquire; - } finally { - internalReaderManager.release(acquire); - } + this.current = internalReaderManager.acquire(); // steal the reference without warming up } @Override @@ -340,26 +335,25 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea // it's a save operation since we acquire the reader which incs it's reference but then down the road // steal it by calling incRef on the "stolen" reader internalReaderManager.maybeRefreshBlocking(); - ElasticsearchDirectoryReader acquire = internalReaderManager.acquire(); - try { - if (acquire == referenceToRefresh) { - // nothing has changed - both ref managers share the same instance so we can use reference equality - return null; - } else { - incrementAndNotify(acquire, referenceToRefresh); - return acquire; + final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire(); + if (isWarmedUp == false || newReader != referenceToRefresh) { + boolean success = false; + try { + refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null); + isWarmedUp = true; + success = true; + } finally { + if (success == false) { + internalReaderManager.release(newReader); + } } - } finally { - internalReaderManager.release(acquire); } - } - - private void incrementAndNotify(ElasticsearchDirectoryReader reader, - ElasticsearchDirectoryReader previousReader) throws IOException { - reader.incRef(); // steal the reference - try (Closeable c = reader::decRef) { - refreshListener.accept(reader, previousReader); - reader.incRef(); // double inc-ref if we were successful + // nothing has changed - both ref managers share the same instance so we can use reference equality + if (referenceToRefresh == newReader) { + internalReaderManager.release(newReader); + return null; + } else { + return newReader; // steal the reference } } @@ -374,7 +368,24 @@ protected int getRefCount(ElasticsearchDirectoryReader reference) { } @Override - protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { reference.decRef(); } + protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { + reference.decRef(); + } + } + + @Override + final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) { + if (scope == SearcherScope.EXTERNAL) { + switch (source) { + // we can access segment_stats while a shard is still in the recovering state. + case "segments": + case "segments_stats": + break; + default: + assert externalReaderManager.isWarmedUp : "searcher was not warmed up yet for source[" + source + "]"; + } + } + return true; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6a6bdf9175854..b71c7812aed73 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2699,6 +2699,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { final Sort indexSort = indexSortSupplier.get(); final Engine.Warmer warmer = reader -> { assert Thread.holdsLock(mutex) == false : "warming engine under mutex"; + assert reader != null; if (this.warmer != null) { this.warmer.warm(reader); } @@ -3410,6 +3411,7 @@ public void close() throws IOException { // TODO: add a dedicate recovery stats for the reset translog }); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); + newEngineReference.get().refresh("reset_engine"); synchronized (mutex) { verifyNotClosed(); IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index eed0a6dcb6b47..3c467a902dbf7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -204,6 +204,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -215,6 +216,7 @@ public class InternalEngineTests extends EngineTestCase { public void testVersionMapAfterAutoIDDocument() throws IOException { + engine.refresh("warm_up"); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); Engine.Index operation = randomBoolean() ? @@ -926,6 +928,7 @@ public void testConcurrentGetAndFlush() throws Exception { } public void testSimpleOperations() throws Exception { + engine.refresh("warm_up"); Engine.Searcher searchResult = engine.acquireSearcher("test"); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); @@ -1103,6 +1106,7 @@ public void testSimpleOperations() throws Exception { } public void testSearchResultRelease() throws Exception { + engine.refresh("warm_up"); Engine.Searcher searchResult = engine.acquireSearcher("test"); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); @@ -2175,7 +2179,7 @@ public void testVersioningPromotedReplica() throws IOException { final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.search(new MatchAllDocsQuery(), collector); if (collector.getTotalHits() > 0) { @@ -2740,7 +2744,7 @@ public void testEnableGcDeletes() throws Exception { } public void testExtractShardId() { - try (Engine.Searcher test = this.engine.acquireSearcher("test")) { + try (Engine.Searcher test = this.engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); assertNotNull(shardId); assertEquals(shardId, engine.config().getShardId()); @@ -3015,7 +3019,7 @@ public void testSkipTranslogReplay() throws IOException { engine.close(); try (InternalEngine engine = new InternalEngine(config)) { engine.skipTranslogRecovery(); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits.value, equalTo(0L)); } @@ -3058,6 +3062,7 @@ public void testTranslogReplay() throws IOException { // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); assertEquals(numDocs, translogHandler.appliedOperations()); @@ -3071,6 +3076,7 @@ public void testTranslogReplay() throws IOException { engine.close(); translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); + engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); assertEquals(0, translogHandler.appliedOperations()); @@ -3100,6 +3106,7 @@ public void testTranslogReplay() throws IOException { engine.close(); translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); + engine.refresh("warm_up"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs + 1); assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L)); @@ -4491,7 +4498,7 @@ private void index(final InternalEngine engine, final int id) throws IOException * second is the primary term. */ private Tuple getSequenceID(Engine engine, Engine.Get get) throws EngineException { - try (Engine.Searcher searcher = engine.acquireSearcher("get")) { + try (Engine.Searcher searcher = engine.acquireSearcher("get", Engine.SearcherScope.INTERNAL)) { final long primaryTerm; final long seqNo; DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), get.uid()); @@ -4673,7 +4680,7 @@ public void testRefreshScopedSearcher() throws IOException { InternalEngine engine = // disable merges to make sure that the reader doesn't change unexpectedly during the test createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { - + engine.refresh("warm_up"); try (Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) { assertSameReader(getSearcher, searchSearcher); @@ -5536,7 +5543,7 @@ protected void doRun() throws Exception { public void testAcquireSearcherOnClosingEngine() throws Exception { engine.close(); - expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); + expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)); } public void testNoOpOnClosingEngine() throws Exception { @@ -6195,4 +6202,59 @@ public void afterRefresh(boolean didRefresh) { } } } + + public void testNotWarmUpSearcherInEngineCtor() throws Exception { + try (Store store = createStore()) { + List warmedUpReaders = new ArrayList<>(); + Engine.Warmer warmer = reader -> { + assertNotNull(reader); + assertThat(reader, not(in(warmedUpReaders))); + warmedUpReaders.add(reader); + }; + EngineConfig config = engine.config(); + final TranslogConfig translogConfig = new TranslogConfig(config.getTranslogConfig().getShardId(), + createTempDir(), config.getTranslogConfig().getIndexSettings(), config.getTranslogConfig().getBigArrays()); + EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + config.getIndexSettings(), warmer, store, config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + try (InternalEngine engine = createEngine(configWithWarmer)) { + assertThat(warmedUpReaders, empty()); + assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), + equalTo("searcher was not warmed up yet for source[test]")); + int times = randomIntBetween(1, 10); + for (int i = 0; i < times; i++) { + engine.refresh("test"); + } + assertThat(warmedUpReaders, hasSize(1)); + try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) { + assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader()); + assertSame(warmedUpReaders.get(0), externalSearcher.getDirectoryReader()); + } + } + index(engine, randomInt()); + if (randomBoolean()) { + engine.refresh("test", Engine.SearcherScope.INTERNAL, true); + assertThat(warmedUpReaders, hasSize(1)); + try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) { + assertNotSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader()); + } + } + } + engine.refresh("test"); + assertThat(warmedUpReaders, hasSize(2)); + try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) { + assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader()); + assertSame(warmedUpReaders.get(1), externalSearcher.getDirectoryReader()); + } + } + } + } + } }