From bb2f51f2a7419e7a200bda91d8fa864c72e257d0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Jan 2020 12:13:51 -0500 Subject: [PATCH] Exclude nested documents in LuceneChangesSnapshot (#51279) LuceneChangesSnapshot can be slow if nested documents are heavily used. Also, it estimates the number of operations to be recovered in peer recoveries inaccurately. With this change, we prefer excluding the nested non-root documents in a Lucene query instead. --- .../index/engine/CombinedDocValues.java | 10 ++++----- .../index/engine/InternalEngine.java | 15 ++++++++----- .../index/engine/LuceneChangesSnapshot.java | 15 ++++++++----- .../index/engine/InternalEngineTests.java | 22 +++++++++---------- .../engine/LuceneChangesSnapshotTests.java | 12 ++-------- .../index/engine/EngineTestCase.java | 2 +- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java index 009dc9799d52a..3be58ecb88170 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -47,6 +47,7 @@ final class CombinedDocValues { long docVersion(int segmentDocId) throws IOException { assert versionDV.docID() < segmentDocId; if (versionDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); } return versionDV.longValue(); @@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException { long docSeqNo(int segmentDocId) throws IOException { assert seqNoDV.docID() < segmentDocId; if (seqNoDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); } return seqNoDV.longValue(); } long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } + // We exclude non-root nested documents when querying changes, every returned document must have primary term. assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; + assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"; + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"); } return primaryTermDV.longValue(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 352fa4dda09d4..379642febcf07 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -39,7 +39,10 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; @@ -2850,8 +2853,13 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery( + SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST) + // exclude non-root nested documents + .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST) + .build(); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f); for (LeafReaderContext leaf : directoryReader.leaves()) { final Scorer scorer = weight.scorer(leaf); if (scorer == null) { @@ -2863,9 +2871,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead int docId; while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { final long primaryTerm = dv.docPrimaryTerm(docId); - if (primaryTerm == -1L) { - continue; // skip children docs which do not have primary term - } final long seqNo = dv.docSeqNo(docId); localCheckpointTracker.markSeqNoAsProcessed(seqNo); localCheckpointTracker.markSeqNoAsPersisted(seqNo); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 926bdddee0388..485c1e1510a4b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -24,6 +24,9 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -210,7 +213,11 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray } private TopDocs searchOperations(ScoreDoc after) throws IOException { - final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo); + final Query rangeQuery = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST) + // exclude non-root nested documents + .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST) + .build(); final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)); return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo); } @@ -219,11 +226,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; final long primaryTerm = parallelArray.primaryTerm[docIndex]; - // We don't have to read the nested child documents - those docs don't have primary terms. - if (primaryTerm == -1) { - skippedOperations++; - return null; - } + assert primaryTerm > 0 : "nested child document must be excluded"; final long seqNo = parallelArray.seqNo[docIndex]; // Only pick the first seen seq# if (seqNo == lastSeenSeqNo) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 49d93d3da0fd3..0fb3e46c5aefb 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1811,12 +1811,12 @@ public void run() { public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, + create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(create); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); @@ -3063,7 +3063,7 @@ public void testTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3097,7 +3097,7 @@ public void testTranslogReplay() throws IOException { final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3107,7 +3107,7 @@ public void testTranslogReplay() throws IOException { } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(idxRequest); engine.refresh("test"); @@ -3143,7 +3143,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult index = engine.index(firstIndexRequest); assertThat(index.getVersion(), equalTo(1L)); @@ -3677,7 +3677,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep boolean isRetry = false; long autoGeneratedIdTimestamp = 0; - Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); @@ -3689,7 +3689,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertThat(indexResult.getVersion(), equalTo(1L)); isRetry = true; - index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3718,7 +3718,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() boolean isRetry = true; long autoGeneratedIdTimestamp = 0; - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(firstIndexRequest); @@ -3730,7 +3730,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertThat(indexReplicaResult.getVersion(), equalTo(1L)); isRetry = false; - Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(secondIndexRequest); @@ -3760,7 +3760,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo } public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) { - return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, + return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 30b4db9c90368..0b7c278f81fa9 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); - int totalOps = 0; for (Engine.Operation op : operations) { if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); - if (op instanceof Engine.Index) { - totalOps += ((Engine.Index) op).docs().size(); - } else { - totalOps++; - } } applyOperation(engine, op); if (rarely()) { @@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { engine.refresh("test"); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) { - searcher = null; + assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); Translog.Operation op; while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size())); - } finally { - IOUtils.close(searcher); + assertThat(snapshot.skippedOperations(), equalTo(0)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index c7bb11f4716bb..6feeaabf5878d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -160,7 +160,7 @@ public abstract class EngineTestCase extends ESTestCase { protected Path primaryTranslogDir; protected Path replicaTranslogDir; // A default primary term is used by engine instances created in this test. - protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L); + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true);