Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude nested documents in LuceneChangesSnapshot #51279

Merged
merged 5 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ final class CombinedDocValues {
long docVersion(int segmentDocId) throws IOException {
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found";
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
Expand All @@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException {
long docSeqNo(int segmentDocId) throws IOException {
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found";
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}

long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
// We exclude non-root nested documents when querying changes, every returned document must have primary term.
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found";
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found");
}
return primaryTermDV.longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
Expand All @@ -63,6 +65,7 @@
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
Expand Down Expand Up @@ -2731,7 +2734,11 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(
SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
for (LeafReaderContext leaf : directoryReader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
Expand All @@ -2744,9 +2751,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
int docId;
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
final long primaryTerm = dv.docPrimaryTerm(docId);
if (primaryTerm == -1L) {
continue; // skip children docs which do not have primary term
}
final long seqNo = dv.docSeqNo(docId);
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.markSeqNoAsPersisted(seqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
Expand All @@ -33,6 +35,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -210,7 +213,10 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Query rangeQuery = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -219,11 +225,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
assert primaryTerm > 0 : "nested child document must be excluded";
final long seqNo = parallelArray.seqNo[docIndex];
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,12 +1551,12 @@ public void run() {

public void testVersioningCreateExistsException() throws IOException {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));

create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED,
create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED,
VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(create);
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
Expand Down Expand Up @@ -2796,7 +2796,7 @@ public void testTranslogReplay() throws IOException {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand All @@ -2823,7 +2823,7 @@ public void testTranslogReplay() throws IOException {
final boolean flush = randomBoolean();
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1,
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand All @@ -2833,7 +2833,7 @@ public void testTranslogReplay() throws IOException {
}

doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2,
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult result = engine.index(idxRequest);
engine.refresh("test");
Expand Down Expand Up @@ -2869,7 +2869,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
Expand Down Expand Up @@ -3402,7 +3402,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
boolean isRetry = false;
long autoGeneratedIdTimestamp = 0;

Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(index);
Expand All @@ -3414,7 +3414,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
assertThat(indexResult.getVersion(), equalTo(1L));

isRetry = true;
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand Down Expand Up @@ -3443,7 +3443,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
boolean isRetry = true;
long autoGeneratedIdTimestamp = 0;

Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult result = engine.index(firstIndexRequest);
Expand All @@ -3455,7 +3455,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
assertThat(indexReplicaResult.getVersion(), equalTo(1L));

isRetry = false;
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
0);
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
Expand Down Expand Up @@ -3485,7 +3485,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo
}

public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
UNASSIGNED_SEQ_NO, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
public void testSkipNonRootOfNestedDocuments() throws Exception {
Map<Long, Long> seqNoToTerm = new HashMap<>();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
int totalOps = 0;
for (Engine.Operation op : operations) {
if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) {
seqNoToTerm.put(op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
totalOps += ((Engine.Index) op).docs().size();
} else {
totalOps++;
}
}
applyOperation(engine, op);
if (rarely()) {
Expand All @@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
engine.refresh("test");
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
searcher = null;
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
} finally {
IOUtils.close(searcher);
assertThat(snapshot.skippedOperations(), equalTo(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public abstract class EngineTestCase extends ESTestCase {
protected Path primaryTranslogDir;
protected Path replicaTranslogDir;
// A default primary term is used by engine instances created in this test.
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L);

protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
Expand Down