From 151148622cb4d9c8a0277d10f715de1b7c7cce59 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Jan 2020 12:13:51 -0500 Subject: [PATCH] Exclude nested documents in LuceneChangesSnapshot (#51279) LuceneChangesSnapshot can be slow if nested documents are heavily used. Also, it estimates the number of operations to be recovered in peer recoveries inaccurately. With this change, we prefer excluding the nested non-root documents in a Lucene query instead. --- .../index/engine/CombinedDocValues.java | 10 ++++----- .../index/engine/InternalEngine.java | 14 +++++++----- .../index/engine/LuceneChangesSnapshot.java | 14 +++++++----- .../index/engine/InternalEngineTests.java | 22 +++++++++---------- .../engine/LuceneChangesSnapshotTests.java | 12 ++-------- .../index/engine/EngineTestCase.java | 2 +- 6 files changed, 36 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java index 009dc9799d52a..3be58ecb88170 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -47,6 +47,7 @@ final class CombinedDocValues { long docVersion(int segmentDocId) throws IOException { assert versionDV.docID() < segmentDocId; if (versionDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); } return versionDV.longValue(); @@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException { long docSeqNo(int segmentDocId) throws IOException { assert seqNoDV.docID() < segmentDocId; if (seqNoDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); } return seqNoDV.longValue(); } long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } + // We exclude non-root nested documents when querying changes, every returned document must have primary term. assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; + assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"; + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"); } return primaryTermDV.longValue(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f6bde020b84d5..cf3182fd5a7c1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -38,6 +38,8 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -63,6 +65,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; @@ -2731,8 +2734,12 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery( + SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f); for (LeafReaderContext leaf : directoryReader.leaves()) { final Scorer scorer = weight.scorer(leaf); if (scorer == null) { @@ -2744,9 +2751,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead int docId; while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { final long primaryTerm = dv.docPrimaryTerm(docId); - if (primaryTerm == -1L) { - continue; // skip children docs which do not have primary term - } final long seqNo = dv.docSeqNo(docId); localCheckpointTracker.markSeqNoAsProcessed(seqNo); localCheckpointTracker.markSeqNoAsPersisted(seqNo); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 7ae37965a2ffd..ef21c6a86741e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -24,6 +24,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -33,6 +35,7 @@ import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -210,7 +213,10 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray } private TopDocs searchOperations(ScoreDoc after) throws IOException { - final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo); + final Query rangeQuery = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)); return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo); } @@ -219,11 +225,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; final long primaryTerm = parallelArray.primaryTerm[docIndex]; - // We don't have to read the nested child documents - those docs don't have primary terms. - if (primaryTerm == -1) { - skippedOperations++; - return null; - } + assert primaryTerm > 0 : "nested child document must be excluded"; final long seqNo = parallelArray.seqNo[docIndex]; // Only pick the first seen seq# if (seqNo == lastSeenSeqNo) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 64a3392d1f880..986017eafae59 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1551,12 +1551,12 @@ public void run() { public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, + create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(create); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); @@ -2796,7 +2796,7 @@ public void testTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -2823,7 +2823,7 @@ public void testTranslogReplay() throws IOException { final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -2833,7 +2833,7 @@ public void testTranslogReplay() throws IOException { } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(idxRequest); engine.refresh("test"); @@ -2869,7 +2869,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult index = engine.index(firstIndexRequest); assertThat(index.getVersion(), equalTo(1L)); @@ -3402,7 +3402,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep boolean isRetry = false; long autoGeneratedIdTimestamp = 0; - Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); @@ -3414,7 +3414,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertThat(indexResult.getVersion(), equalTo(1L)); isRetry = true; - index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3443,7 +3443,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() boolean isRetry = true; long autoGeneratedIdTimestamp = 0; - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(firstIndexRequest); @@ -3455,7 +3455,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertThat(indexReplicaResult.getVersion(), equalTo(1L)); isRetry = false; - Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(secondIndexRequest); @@ -3485,7 +3485,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo } public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) { - return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, + return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index e03662b7d5539..0bdc89a38e8ec 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); - int totalOps = 0; for (Engine.Operation op : operations) { if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); - if (op instanceof Engine.Index) { - totalOps += ((Engine.Index) op).docs().size(); - } else { - totalOps++; - } } applyOperation(engine, op); if (rarely()) { @@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { engine.refresh("test"); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) { - searcher = null; + assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); Translog.Operation op; while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size())); - } finally { - IOUtils.close(searcher); + assertThat(snapshot.skippedOperations(), equalTo(0)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f2a44b766e73e..dac00626f4420 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -159,7 +159,7 @@ public abstract class EngineTestCase extends ESTestCase { protected Path primaryTranslogDir; protected Path replicaTranslogDir; // A default primary term is used by engine instances created in this test. - protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L); + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true);