-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Read changes from Lucene instead of translog #30120
Merged
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
f3a8f95
[CCR] use IndexSearcher to read operations from Lucene index instead
martijnvg 341eb39
moved CCRIndexReader to Lucene.java and added a simple test
martijnvg df85c61
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn 59b69e3
use existing Lucene
dnhatn 1b69093
Move to lucene snapshot
dnhatn 98ab2ea
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn f86dc1d
Use the changes snapshot
dnhatn 1fe57c0
More test
dnhatn ce6d8da
backout mapping changes
dnhatn 974c44c
harden tests
dnhatn 23b8c51
Simulate rollback in test
dnhatn f2415e7
Remove onClose callback
dnhatn 29a145e
Boaz’s feedbacks
dnhatn 8d8c6b1
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn 2b559b5
Capture and set checkpoint
dnhatn 2a23d31
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn 3abe88e
range check
dnhatn b357a54
nested docs
dnhatn f34c0d0
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn 09c48ea
index.soft_deletes -> index.soft_deletes.enabled
dnhatn f8b74fa
Merge branch 'ccr' into ccr_from_translog_to_lucene
dnhatn aa1f1c0
Cache DocValues
dnhatn c3b0e7a
Let caller release searcher when failed to open snapshot
dnhatn 3b8c63b
Load DocValues eagerly
dnhatn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
/** | ||
* A {@link Translog.Snapshot} from changes in a Lucene index | ||
|
@@ -62,7 +63,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { | |
private final TopDocs topDocs; | ||
|
||
private final Closeable onClose; | ||
private final CombinedDocValues[] docValues; // cache of docvalues | ||
private final CombinedDocValues[] docValues; // Cache of DocValues | ||
|
||
/** | ||
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. | ||
|
@@ -209,11 +210,11 @@ private Translog.Operation readDocAsOp(int docID) throws IOException { | |
} | ||
|
||
private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { | ||
final NumericDocValues dv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD); | ||
if (dv == null || dv.advanceExact(segmentDocId) == false) { | ||
final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD); | ||
if (ndv == null || ndv.advanceExact(segmentDocId) == false) { | ||
throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETE_FIELD + "] is not found"); | ||
} | ||
return dv.longValue() == 1; | ||
return ndv.longValue() == 1; | ||
} | ||
|
||
private static final class CombinedDocValues { | ||
|
@@ -223,76 +224,57 @@ private static final class CombinedDocValues { | |
private NumericDocValues primaryTermDV; | ||
private NumericDocValues tombstoneDV; | ||
|
||
CombinedDocValues(LeafReader leafReader) { | ||
CombinedDocValues(LeafReader leafReader) throws IOException { | ||
this.leafReader = leafReader; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please don't load stuff lazily. go and load it all in the ctor. they are in memory anyways. |
||
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); | ||
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); | ||
this.primaryTermDV = Objects.requireNonNull( | ||
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); | ||
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); | ||
} | ||
|
||
NumericDocValues reloadIfNeed(NumericDocValues dv, String field, int segmentDocId) throws IOException { | ||
if (dv == null || dv.docID() > segmentDocId) { | ||
dv = leafReader.getNumericDocValues(field); | ||
long docVersion(int segmentDocId) throws IOException { | ||
if (versionDV.docID() > segmentDocId) { | ||
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); | ||
} | ||
if (dv == null || dv.advanceExact(segmentDocId) == false) { | ||
throw new IllegalStateException("DocValues for field [" + field + "] is not found"); | ||
if (versionDV.advanceExact(segmentDocId) == false) { | ||
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); | ||
} | ||
return dv; | ||
} | ||
|
||
long docVersion(int segmentDocId) throws IOException { | ||
versionDV = reloadIfNeed(versionDV, VersionFieldMapper.NAME, segmentDocId); | ||
return versionDV.longValue(); | ||
} | ||
|
||
long docSeqNo(int segmentDocID) throws IOException { | ||
seqNoDV = reloadIfNeed(seqNoDV, SeqNoFieldMapper.NAME, segmentDocID); | ||
long docSeqNo(int segmentDocId) throws IOException { | ||
if (seqNoDV.docID() > segmentDocId) { | ||
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); | ||
} | ||
if (seqNoDV.advanceExact(segmentDocId) == false) { | ||
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); | ||
} | ||
return seqNoDV.longValue(); | ||
} | ||
|
||
long docPrimaryTerm(int segmentDocId) throws IOException { | ||
if (primaryTermDV == null || primaryTermDV.docID() > segmentDocId) { | ||
if (primaryTermDV == null) { | ||
return -1L; | ||
} | ||
if (primaryTermDV.docID() > segmentDocId) { | ||
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); | ||
} | ||
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. | ||
if (primaryTermDV == null || primaryTermDV.advanceExact(segmentDocId) == false) { | ||
if (primaryTermDV.advanceExact(segmentDocId) == false) { | ||
return -1; | ||
} | ||
return primaryTermDV.longValue(); | ||
} | ||
|
||
boolean isTombstone(int segmentDocId) throws IOException { | ||
if (tombstoneDV == null || tombstoneDV.docID() > segmentDocId) { | ||
if (tombstoneDV == null) { | ||
return false; | ||
} | ||
if (tombstoneDV.docID() > segmentDocId) { | ||
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); | ||
if (tombstoneDV == null) { | ||
tombstoneDV = EMPTY_DOC_VALUES; // tombstones are rare - use dummy so that we won't have to reload many times. | ||
} | ||
} | ||
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; | ||
} | ||
} | ||
|
||
private static final NumericDocValues EMPTY_DOC_VALUES = new NumericDocValues() { | ||
@Override | ||
public long longValue() { | ||
return 0; | ||
} | ||
@Override | ||
public boolean advanceExact(int target) { | ||
return false; | ||
} | ||
@Override | ||
public int docID() { | ||
return 0; | ||
} | ||
@Override | ||
public int nextDoc() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
@Override | ||
public int advance(int target) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
@Override | ||
public long cost() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
}; | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these can all be final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw Sadly no. We sometimes need to reload these DocValues if the targeting docId is smaller than the current docId.
Do you have any other idea for this?