diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java index 009dc9799d52a..3be58ecb88170 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -47,6 +47,7 @@ final class CombinedDocValues { long docVersion(int segmentDocId) throws IOException { assert versionDV.docID() < segmentDocId; if (versionDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); } return versionDV.longValue(); @@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException { long docSeqNo(int segmentDocId) throws IOException { assert seqNoDV.docID() < segmentDocId; if (seqNoDV.advanceExact(segmentDocId) == false) { + assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"; throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); } return seqNoDV.longValue(); } long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } + // We exclude non-root nested documents when querying changes, every returned document must have primary term. assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; + assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"; + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found"); } return primaryTermDV.longValue(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f6bde020b84d5..cf3182fd5a7c1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -38,6 +38,8 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -63,6 +65,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; @@ -2731,8 +2734,12 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery( + SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f); for (LeafReaderContext leaf : directoryReader.leaves()) { final Scorer scorer = weight.scorer(leaf); if (scorer == null) { @@ -2744,9 +2751,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead int docId; while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { final long primaryTerm = dv.docPrimaryTerm(docId); - if (primaryTerm == -1L) { - continue; // skip children docs which do not have primary term - } final long seqNo = dv.docSeqNo(docId); localCheckpointTracker.markSeqNoAsProcessed(seqNo); localCheckpointTracker.markSeqNoAsPersisted(seqNo); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 7ae37965a2ffd..ef21c6a86741e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -24,6 +24,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -33,6 +35,7 @@ import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -210,7 +213,10 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray } private TopDocs searchOperations(ScoreDoc after) throws IOException { - final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo); + final Query rangeQuery = new BooleanQuery.Builder() + .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents + .build(); final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)); return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo); } @@ -219,11 +225,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException { final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; final long primaryTerm = parallelArray.primaryTerm[docIndex]; - // We don't have to read the nested child documents - those docs don't have primary terms. - if (primaryTerm == -1) { - skippedOperations++; - return null; - } + assert primaryTerm > 0 : "nested child document must be excluded"; final long seqNo = parallelArray.seqNo[docIndex]; // Only pick the first seen seq# if (seqNo == lastSeenSeqNo) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 64a3392d1f880..986017eafae59 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1551,12 +1551,12 @@ public void run() { public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, + create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(create); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); @@ -2796,7 +2796,7 @@ public void testTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -2823,7 +2823,7 @@ public void testTranslogReplay() throws IOException { final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -2833,7 +2833,7 @@ public void testTranslogReplay() throws IOException { } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(idxRequest); engine.refresh("test"); @@ -2869,7 +2869,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult index = engine.index(firstIndexRequest); assertThat(index.getVersion(), equalTo(1L)); @@ -3402,7 +3402,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep boolean isRetry = false; long autoGeneratedIdTimestamp = 0; - Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); @@ -3414,7 +3414,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertThat(indexResult.getVersion(), equalTo(1L)); isRetry = true; - index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3443,7 +3443,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() boolean isRetry = true; long autoGeneratedIdTimestamp = 0; - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(firstIndexRequest); @@ -3455,7 +3455,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertThat(indexReplicaResult.getVersion(), equalTo(1L)); isRetry = false; - Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(secondIndexRequest); @@ -3485,7 +3485,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo } public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) { - return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, + return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index e03662b7d5539..0bdc89a38e8ec 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); - int totalOps = 0; for (Engine.Operation op : operations) { if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); - if (op instanceof Engine.Index) { - totalOps += ((Engine.Index) op).docs().size(); - } else { - totalOps++; - } } applyOperation(engine, op); if (rarely()) { @@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { engine.refresh("test"); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) { - searcher = null; + assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); Translog.Operation op; while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size())); - } finally { - IOUtils.close(searcher); + assertThat(snapshot.skippedOperations(), equalTo(0)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f2a44b766e73e..dac00626f4420 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -159,7 +159,7 @@ public abstract class EngineTestCase extends ESTestCase { protected Path primaryTranslogDir; protected Path replicaTranslogDir; // A default primary term is used by engine instances created in this test. - protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L); + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true);