Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure external refreshes will also refresh internal searcher to minimize segment creation #27253

Merged
merged 8 commits into from
Nov 9, 2017
22 changes: 14 additions & 8 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) {
return IndexWriter.SOURCE_MERGE.equals(source);
}

protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
return new EngineSearcher(source, searcher, manager, store, logger);
}

Expand Down Expand Up @@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws
* the searcher is acquired. */
store.incRef();
try {
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
Expand Down Expand Up @@ -585,7 +585,7 @@ public CommitStats commitStats() {
/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
Expand Down Expand Up @@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) {
public final boolean refreshNeeded() {
if (store.tryIncRef()) {
/*
we need to inc the store here since searcherManager.isSearcherCurrent()
acquires a searcher internally and that might keep a file open on the
we need to inc the store here since we acquire a searcher and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
*/
try {
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
final IndexSearcher searcher = manager.acquire();
try {
final IndexReader r = searcher.getIndexReader();
return ((DirectoryReader) r).isCurrent() == false;
} finally {
manager.release(searcher);
}
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
Expand Down Expand Up @@ -1331,7 +1337,7 @@ public void release() {
}
}

protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);

/**
* Method to close the engine while the write lock is held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.index.store.Store;
Expand All @@ -32,12 +33,12 @@
* Searcher for an Engine
*/
public class EngineSearcher extends Engine.Searcher {
private final SearcherManager manager;
private final ReferenceManager<IndexSearcher> manager;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Store store;
private final Logger logger;

public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
super(source, searcher);
this.manager = manager;
this.store = store;
Expand Down
111 changes: 90 additions & 21 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
Expand All @@ -57,7 +58,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -108,7 +108,7 @@ public class InternalEngine extends Engine {

private final IndexWriter indexWriter;

private final SearcherManager externalSearcherManager;
private final ExternalSearcherManager externalSearcherManager;
private final SearcherManager internalSearcherManager;

private final Lock flushLock = new ReentrantLock();
Expand Down Expand Up @@ -172,7 +172,7 @@ public InternalEngine(EngineConfig engineConfig) {
store.incRef();
IndexWriter writer = null;
Translog translog = null;
SearcherManager externalSearcherManager = null;
ExternalSearcherManager externalSearcherManager = null;
SearcherManager internalSearcherManager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
Expand Down Expand Up @@ -224,8 +224,8 @@ public InternalEngine(EngineConfig engineConfig) {
throw e;
}
}
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
internalSearcherManager = externalSearcherManager.internalSearcherManager;
this.internalSearcherManager = internalSearcherManager;
this.externalSearcherManager = externalSearcherManager;
internalSearcherManager.addListener(versionMap);
Expand All @@ -238,7 +238,7 @@ public InternalEngine(EngineConfig engineConfig) {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand All @@ -248,6 +248,75 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
* refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
* is happening and the refresh interval is low (ie. 1 sec)
*
* This also prevents segment starvation where an internal reader holds on to old segments literally forever
* since no indexing is happening and refreshes are only happening to the external reader manager, while with
* this specialized implementation an external refresh will immediately be reflected on the internal reader
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
*/
@SuppressForbidden(reason = "reference counting is required here")
private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
private final SearcherFactory searcherFactory;
private final SearcherManager internalSearcherManager;

ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
IndexSearcher acquire = internalSearcherManager.acquire();
try {
IndexReader indexReader = acquire.getIndexReader();
assert indexReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
} finally {
internalSearcherManager.release(acquire);
}
this.searcherFactory = searcherFactory;
this.internalSearcherManager = internalSearcherManager;
}

@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
// we simply run a blocking refresh on the internal reference manager and then steal it's reader
// it's a save operation since we acquire the reader which incs it's reference but then down the road
// steal it by calling incRef on the "stolen" reader
internalSearcherManager.maybeRefreshBlocking();
IndexSearcher acquire = internalSearcherManager.acquire();
final IndexReader previousReader = referenceToRefresh.getIndexReader();
assert previousReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
try {
final IndexReader newReader = acquire.getIndexReader();
if (newReader == previousReader) {
// nothing has changed - both ref managers share the same instance so we can use reference equality
return null;
} else {
newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
}
} finally {
internalSearcherManager.release(acquire);
}
}

@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}

@Override
protected int getRefCount(IndexSearcher reference) {
return reference.getIndexReader().getRefCount();
}

@Override
protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
}

@Override
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down Expand Up @@ -456,18 +525,18 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
return uuid;
}

private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
SearcherManager internalSearcherManager = null;
try {
try {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
searcherManager = new SearcherManager(directoryReader, searcherFactory);
if (readSegmentsInfo) {
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
}
internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
return searcherManager;
return externalSearcherManager;
} catch (IOException e) {
maybeFailEngine("start", e);
try {
Expand All @@ -479,7 +548,7 @@ private SearcherManager createSearcherManager(SearcherFactory searcherFactory, b
}
} finally {
if (success == false) { // release everything we created on a failure
IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
}
}
}
Expand Down Expand Up @@ -1229,24 +1298,24 @@ public void refresh(String source) throws EngineException {
}

final void refresh(String source, SearcherScope scope) throws EngineException {
long bytes = 0;
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.

// this will also cause version map ram to be freed hence we always account for it.
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
writingBytes.addAndGet(bytes);
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
bytes = indexWriter.ramBytesUsed();
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
writingBytes.addAndGet(bytes);
externalSearcherManager.maybeRefreshBlocking();
// the break here is intentional we never refresh both internal / external together
break;
case INTERNAL:
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
bytes += versionMapBytes;
writingBytes.addAndGet(bytes);
internalSearcherManager.maybeRefreshBlocking();
break;
default:
Expand Down Expand Up @@ -1709,7 +1778,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}

@Override
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
switch (scope) {
case INTERNAL:
return internalSearcherManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3882,7 +3882,7 @@ public void assertSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
assertEquals(rightLeaves.size(), leftLeaves.size());
for (int i = 0; i < leftLeaves.size(); i++) {
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sneaky :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah suddenly we have a test that triggered it 💃

}
}

Expand All @@ -3891,7 +3891,7 @@ public void assertNotSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
if (rightLeaves.size() == leftLeaves.size()) {
for (int i = 0; i < leftLeaves.size(); i++) {
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
return; // all is well
}
}
Expand Down Expand Up @@ -3919,7 +3919,6 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(0, searchSearcher.reader().numDocs());
assertNotSameReader(getSearcher, searchSearcher);
}

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Expand All @@ -3928,6 +3927,36 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(10, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

// now ensure external refreshes are reflected on the internal reader
final String docId = Integer.toString(10);
final ParsedDocument doc =
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
engine.index(primaryResponse);

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertEquals(11, getSearcher.reader().numDocs());
assertEquals(11, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}
}

public void testSeqNoGenerator() throws IOException {
Expand Down
Loading