From eb62da77b7752825cdd0af9d122a49c0c333b3ea Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Nov 2017 08:27:07 +0100 Subject: [PATCH 1/4] Ensure external refreshes will also refresh internal seacher to minimize segment creation We cut over to internal and external IndexReader/IndexSeacher in #26972 which uses two independent searcher managers. This has the downside that refreshes of the external reader will never clear the internal version map which in-turn will trigger additional and potentially unnecessary segment flushes since memory must be freed. Under heavy indexing load with low refresh intervals this can cause excessive segment creation which causes high GC activity and significantly increases the required segment merges. This change adds a dedicated external reference manager that delegates refreshes to the internal reference manager that then `steals` the refreshed reader from the internal reference manager for external usage. This ensures that external and internal readers are consistent on an external refresh. As a sideeffect this also releases old segments referenced by the internal reference manager which can potentially hold on to already merged away segments until it is refreshed due to a flush or indexing activity. --- .../elasticsearch/index/engine/Engine.java | 22 ++-- .../index/engine/EngineSearcher.java | 5 +- .../index/engine/InternalEngine.java | 105 ++++++++++++++---- .../index/engine/InternalEngineTests.java | 35 +++++- .../test/engine/MockEngineSupport.java | 7 +- .../test/engine/MockInternalEngine.java | 3 +- 6 files changed, 142 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 42805a19b340..99410d9f624d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -36,7 +36,7 @@ import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) { return IndexWriter.SOURCE_MERGE.equals(source); } - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) { + protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager manager) { return new EngineSearcher(source, searcher, manager, store, logger); } @@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws * the searcher is acquired. */ store.incRef(); try { - final SearcherManager manager = getSearcherManager(source, scope); // can never be null + final ReferenceManager manager = getSearcherManager(source, scope); // can never be null /* This might throw NPE but that's fine we will run ensureOpen() * in the catch block and throw the right exception */ final IndexSearcher searcher = manager.acquire(); @@ -585,7 +585,7 @@ public CommitStats commitStats() { /** * Read the last segments info from the commit pointed to by the searcher manager */ - protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException { + protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager sm, final Store store) throws IOException { IndexSearcher searcher = sm.acquire(); try { IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit(); @@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) { public final boolean refreshNeeded() { if (store.tryIncRef()) { /* - we need to inc the store here since searcherManager.isSearcherCurrent() - acquires a searcher internally and that might keep a file open on the + we need to inc the store here since we acquire a searcher and that might keep a file open on the store. this violates the assumption that all files are closed when the store is closed so we need to make sure we increment it here */ try { - return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false; + ReferenceManager manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL); + final IndexSearcher searcher = manager.acquire(); + try { + final IndexReader r = searcher.getIndexReader(); + return ((DirectoryReader) r).isCurrent() == false; + } finally { + manager.release(searcher); + } } catch (IOException e) { logger.error("failed to access searcher manager", e); failEngine("failed to access searcher manager", e); @@ -1331,7 +1337,7 @@ public void release() { } } - protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope); + protected abstract ReferenceManager getSearcherManager(String source, SearcherScope scope); /** * Method to close the engine while the write lock is held. diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java b/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java index a53ac1dd4152..c72ec543e713 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.index.store.Store; @@ -32,12 +33,12 @@ * Searcher for an Engine */ public class EngineSearcher extends Engine.Searcher { - private final SearcherManager manager; + private final ReferenceManager manager; private final AtomicBoolean released = new AtomicBoolean(false); private final Store store; private final Logger logger; - public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) { + public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager manager, Store store, Logger logger) { super(source, searcher); this.manager = manager; this.store = store; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ac0209998737..bc29983ce39c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -57,7 +57,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -108,7 +107,7 @@ public class InternalEngine extends Engine { private final IndexWriter indexWriter; - private final SearcherManager externalSearcherManager; + private final ExternalSearcherManager externalSearcherManager; private final SearcherManager internalSearcherManager; private final Lock flushLock = new ReentrantLock(); @@ -172,7 +171,7 @@ public InternalEngine(EngineConfig engineConfig) { store.incRef(); IndexWriter writer = null; Translog translog = null; - SearcherManager externalSearcherManager = null; + ExternalSearcherManager externalSearcherManager = null; SearcherManager internalSearcherManager = null; EngineMergeScheduler scheduler = null; boolean success = false; @@ -224,8 +223,9 @@ public InternalEngine(EngineConfig engineConfig) { throw e; } } - internalSearcherManager = createSearcherManager(new SearcherFactory(), false); - externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true); + externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, + engineConfig)); + internalSearcherManager = externalSearcherManager.internalSearcherManager; this.internalSearcherManager = internalSearcherManager; this.externalSearcherManager = externalSearcherManager; internalSearcherManager.addListener(versionMap); @@ -238,7 +238,7 @@ public InternalEngine(EngineConfig engineConfig) { success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler); + IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -248,6 +248,74 @@ public InternalEngine(EngineConfig engineConfig) { logger.trace("created new InternalEngine"); } + /** + * This reference manager delegates all it's refresh calls to another (internal) SearcherManager + * The main purpose for this is that if we have external refreshes happening we don't issue extra + * refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing + * is happening and the refresh interval is low (ie. 1 sec) + * + * This also prevents segment starvation where an internal reader holds on to old segments literally forever + * since no indexing is happening and refreshes are only happening to the external reader manager, while with + * this specialized implementation an external refresh will immediately be reflected on the internal reader + * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) + */ + private static final class ExternalSearcherManager extends ReferenceManager { + private final SearcherFactory searcherFactory; + private final SearcherManager internalSearcherManager; + + public ExternalSearcherManager(SearcherManager manager, SearcherFactory searcherFactory) throws IOException { + IndexSearcher acquire = manager.acquire(); + try { + IndexReader indexReader = acquire.getIndexReader(); + assert indexReader instanceof ElasticsearchDirectoryReader: + "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader; + indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails + current = SearcherManager.getSearcher(searcherFactory, indexReader, null); + } finally { + manager.release(acquire); + } + this.searcherFactory = searcherFactory; + this.internalSearcherManager = manager; + } + + @Override + protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException { + // we simply run a blocking refresh on the internal reference manager and then steal it's reader + // it's a save operation since we acquire the reader which incs it's reference but then down the road + // steal it by calling incRef on the "stolen" reader + internalSearcherManager.maybeRefreshBlocking(); + IndexSearcher acquire = internalSearcherManager.acquire(); + final IndexReader previousReader = referenceToRefresh.getIndexReader(); + assert previousReader instanceof ElasticsearchDirectoryReader: + "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader; + try { + final IndexReader newReader = acquire.getIndexReader(); + if (newReader == previousReader) { + // nothing has changed - both ref managers share the same instance so we can use reference equality + return null; + } else { + newReader.incRef(); // steal the reader - getSearcher will decrement if it fails + return SearcherManager.getSearcher(searcherFactory, newReader, previousReader); + } + } finally { + internalSearcherManager.release(acquire); + } + } + + @Override + protected boolean tryIncRef(IndexSearcher reference) { + return reference.getIndexReader().tryIncRef(); + } + + @Override + protected int getRefCount(IndexSearcher reference) { + return reference.getIndexReader().getRefCount(); + } + + @Override + protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); } + } + @Override public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { @@ -456,18 +524,17 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force return uuid; } - private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException { + private ExternalSearcherManager createSearcherManager(SearchFactory factory) throws EngineException { boolean success = false; SearcherManager searcherManager = null; try { try { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); - searcherManager = new SearcherManager(directoryReader, searcherFactory); - if (readSegmentsInfo) { - lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); - } + searcherManager = new SearcherManager(directoryReader, new SearcherFactory()); + lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); + ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(searcherManager, factory); success = true; - return searcherManager; + return externalSearcherManager; } catch (IOException e) { maybeFailEngine("start", e); try { @@ -1229,24 +1296,24 @@ public void refresh(String source) throws EngineException { } final void refresh(String source, SearcherScope scope) throws EngineException { - long bytes = 0; // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) + // both refresh types will result in an internal refresh but only the external will also + // pass the new reader reference to the external reader manager. + + // this will also cause version map ram to be freed hence we always account for it. + final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh(); + writingBytes.addAndGet(bytes); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - bytes = indexWriter.ramBytesUsed(); switch (scope) { case EXTERNAL: // even though we maintain 2 managers we really do the heavy-lifting only once. // the second refresh will only do the extra work we have to do for warming caches etc. - writingBytes.addAndGet(bytes); externalSearcherManager.maybeRefreshBlocking(); // the break here is intentional we never refresh both internal / external together break; case INTERNAL: - final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); - bytes += versionMapBytes; - writingBytes.addAndGet(bytes); internalSearcherManager.maybeRefreshBlocking(); break; default: @@ -1717,7 +1784,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { } @Override - protected SearcherManager getSearcherManager(String source, SearcherScope scope) { + protected ReferenceManager getSearcherManager(String source, SearcherScope scope) { switch (scope) { case INTERNAL: return internalSearcherManager; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f6d2d9965809..1ce4620275da 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4226,7 +4226,7 @@ public void assertSameReader(Searcher left, Searcher right) { List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); assertEquals(rightLeaves.size(), leftLeaves.size()); for (int i = 0; i < leftLeaves.size(); i++) { - assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader()); + assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader()); } } @@ -4235,7 +4235,7 @@ public void assertNotSameReader(Searcher left, Searcher right) { List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); if (rightLeaves.size() == leftLeaves.size()) { for (int i = 0; i < leftLeaves.size(); i++) { - if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) { + if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) { return; // all is well } } @@ -4263,7 +4263,6 @@ public void testRefreshScopedSearcher() throws IOException { assertEquals(0, searchSearcher.reader().numDocs()); assertNotSameReader(getSearcher, searchSearcher); } - engine.refresh("test", Engine.SearcherScope.EXTERNAL); try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); @@ -4272,6 +4271,36 @@ public void testRefreshScopedSearcher() throws IOException { assertEquals(10, searchSearcher.reader().numDocs()); assertSameReader(getSearcher, searchSearcher); } + + // now ensure external refreshes are reflected on the internal reader + final String docId = Integer.toString(10); + final ParsedDocument doc = + testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + engine.index(primaryResponse); + + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(11, getSearcher.reader().numDocs()); + assertEquals(11, searchSearcher.reader().numDocs()); + assertSameReader(getSearcher, searchSearcher); + } + + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){ + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){ + assertSame(searcher.searcher(), nextSearcher.searcher()); + } + } + + try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertSame(searcher.searcher(), nextSearcher.searcher()); + } + } } public void testSeqNoGenerator() throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java index 4c5bd0d3267d..144b2be1b023 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java @@ -26,6 +26,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; @@ -133,7 +134,8 @@ public CloseAction flushOrClose(CloseAction originalAction) throws IOException { } } - public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, + ReferenceManager manager) throws EngineException { IndexReader reader = searcher.getIndexReader(); IndexReader wrappedReader = reader; assert reader != null; @@ -182,7 +184,8 @@ public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrap } - public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) { + public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, + ReferenceManager manager) { final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager); assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true)); // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index fe8c4daec8df..92c7b4d9fc0d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; @@ -78,7 +79,7 @@ private void flushAndCloseInternal() throws IOException { } @Override - protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException { + protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager manager) throws EngineException { final Searcher engineSearcher = super.newSearcher(source, searcher, manager); return support().wrapSearcher(source, engineSearcher, searcher, manager); } From 177902ea331c4d9b940ee5005b253b48fac2f038 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Nov 2017 12:37:13 +0000 Subject: [PATCH 2/4] remove redundant modifier --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bc29983ce39c..81769d213f9d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -263,7 +263,7 @@ private static final class ExternalSearcherManager extends ReferenceManager Date: Fri, 3 Nov 2017 12:58:45 +0000 Subject: [PATCH 3/4] add suppress forbidden - I know what I am doing --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 81769d213f9d..383a89523a57 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -48,6 +48,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; @@ -259,6 +260,7 @@ public InternalEngine(EngineConfig engineConfig) { * this specialized implementation an external refresh will immediately be reflected on the internal reader * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) */ + @SuppressForbidden(reason = "reference counting is required here") private static final class ExternalSearcherManager extends ReferenceManager { private final SearcherFactory searcherFactory; private final SearcherManager internalSearcherManager; From 4dd17186136c5feccf663b602bb94c15e316d4ec Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 8 Nov 2017 12:36:14 +0100 Subject: [PATCH 4/4] apply review comments from @bleskes --- .../index/engine/InternalEngine.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 383a89523a57..3bb3eeeebf08 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -224,8 +224,7 @@ public InternalEngine(EngineConfig engineConfig) { throw e; } } - externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, - engineConfig)); + externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig)); internalSearcherManager = externalSearcherManager.internalSearcherManager; this.internalSearcherManager = internalSearcherManager; this.externalSearcherManager = externalSearcherManager; @@ -265,8 +264,8 @@ private static final class ExternalSearcherManager extends ReferenceManager