-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Read changes from Lucene instead of translog #30120
Conversation
Pinging @elastic/es-distributed |
return versionDvField.longValue(); | ||
} | ||
|
||
private static boolean isDeleteOperation(LeafReaderContext leafReaderContext, int segmentDocId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not correct I think we are blocked here unitl we can identify tombstones
@martijnvg Discussed with @jasontedor, I will continue your work here. |
@martijnvg I've backed out the mapping logic to make this PR as a cut-over from translog to Lucene ops. We can make that in a follow-up. Thank you. |
@dnhatn No problem, I can make that change in a different pr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this. I left some initial comments and questions
if (lastRefreshedCheckpoint() < maxSeqNo) { | ||
refresh(source, SearcherScope.INTERNAL); | ||
} | ||
refresh(source, SearcherScope.INTERNAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a mistake?
@@ -2388,4 +2404,28 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends | |||
return super.softUpdateDocuments(term, docs, softDeletes); | |||
} | |||
} | |||
|
|||
/** | |||
* Returned the maximum local checkpoint value has been refreshed internally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the last local checkpoint
@Override | ||
public void afterRefresh(boolean didRefresh) { | ||
if (didRefresh) { | ||
refreshedCheckpoint.getAndUpdate(prev -> Math.max(prev, pendingCheckpoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unsafe? you make capture things that didn't make it into the reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only mark seq# as completed after adding its op to Lucene and RefreshListener is notified serially under lock. I think it's safe but we need to discuss to make sure that we won't add something unsafe here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure this blocks ongoing indexing? I would definitely double check with @s1monw that we want to rely on this semantics (if it is the case). IMO we should keep it simple and just pre-capture the local checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes Yeah, I think I made it too complicated. I replaced getAndUpdate
by set
.
this.lastSeenSeqNo = fromSeqNo - 1; | ||
this.requiredFullRange = requiredFullRange; | ||
boolean success = false; | ||
final Engine.Searcher engineSearcher = searcherFactory.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need supplier? can we just give the searcher as a param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think I see why, it's for closing. I think it's still to pass in a search and close it on exception as you did now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I passed an engine searcher directly.
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); | ||
final Sort sortedBySeqNoThenByTerm = new Sort( | ||
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), | ||
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed - this should be needed in the future. Maybe we should remove it and instead assert that we never have duplicate seq#
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I miss something here because I think we need it for now but not in the future after we have a Lucene rollback. I will reach out to discuss this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry but I dropped a not in my comment. "this should not be needed in the future." . It's only relevant in cases where the primary dies while indexing is ongoing and we have more than 1 replica. In these cases this primary sort doesn't help because you also need some kind of a deduping mechanism to realy make it work. Such deduping is fairly easy to implement but I'm on the fence to whether we should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have dedup in this PR already (line 161-163). The lastSeenSeqNo
is used for dedup and range check. I am fine to remove the primary sort and dedup mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I missed it. I think it's surprising to put it in readDocAsOp
and shortcut. I'd prefer to do it in next
where do all our state updates and then everything together. it's rare anyway and doesn't require optimization imo. That said, it's all nits. If you prefer it otherwise I'm good. Thanks for clarifying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we should not mutate anything in readDocAsOp
. I will update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes I moved this to next
but we also need to dudup for nested docs then I moved this to readDocAsOp
again. I think we should optimize for nested docs. I am open to suggestions here.
@Override | ||
public Translog.Operation next() throws IOException { | ||
final Translog.Operation op = nextOp(); | ||
if (requiredFullRange && lastSeenSeqNo < toSeqNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we to check here that lastSeenSeqNo is < toSeqNo? shouldn't we stop reading before this happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to assert that seqNo != lastSeeSeqNo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller should continue consuming the snapshot until the next
method returns null. In the last call, lastSeenSeqNo equals to toSeqNo and op is null. This guard is added to avoid checking in this case. I am +1 on the assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused - you check for op==null later on? maybe just put the op!=null check on this outer if?
final Translog.Operation op; | ||
final boolean isTombstone = isTombstoneOperation(leaf, segmentDocID); | ||
if (isTombstone && fields.uid() == null) { | ||
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to say yes? It's very rare and it feels like a good debugging tool. I wonder what other people think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make it in a follow-up.
return op; | ||
} | ||
|
||
private boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should just take a LeafReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if we want to pull the tombstoneDV
in the ctor next to List<LeafReaderContext> leaves
and a List<NumericDocValues>
for seqIds... I think this would be nice and prevent getting stuff from the reader over and over again.
return false; | ||
} | ||
|
||
private long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should just take a LeafReader?
@@ -610,7 +609,7 @@ protected static void assertVisibleCount(InternalEngine engine, int numDocs, boo | |||
default: | |||
throw new UnsupportedOperationException("unknown version type: " + versionType); | |||
} | |||
if (randomBoolean()) { | |||
if (true || randomBoolean()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a left over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I removed it.
final Translog.Operation op; | ||
final boolean isTombstone = isTombstoneOperation(leaf, segmentDocID); | ||
if (isTombstone && fields.uid() == null) { | ||
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very cool change! I left some ideas but LGTM overall
this.onClose = engineSearcher; | ||
success = true; | ||
} finally { | ||
if (success == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be handled on the caller side? We are not responsible for this reference to engine searcher unless fully constructued?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
private boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException { | ||
final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can pull all these in the constructor into an array that we can access by index of the leaf reader. this is how we do things in lucene for stuff we access frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw
I tried but realized that NumericDocValues#advanceExact
method requires increasing docID values but it's not the case here. Do you have any suggestion for this?
/** Advance the iterator to exactly {@code target} and return whether
* {@code target} has a value.
* {@code target} must be greater than or equal to the current
* {@link #docID() doc ID} and must be a valid doc ID, ie. ≥ 0 and
* < {@code maxDoc}.
* After this method returns, {@link #docID()} retuns {@code target}. */
public abstract boolean advanceExact(int target) throws IOException;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need to reset the DV :)
@s1monw I've updated the snapshot to cache/reload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a comment but LGTM in general
private NumericDocValues tombstoneDV; | ||
|
||
CombinedDocValues(LeafReader leafReader) { | ||
this.leafReader = leafReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't load stuff lazily. go and load it all in the ctor. they are in memory anyways.
} | ||
} | ||
|
||
private static final NumericDocValues EMPTY_DOC_VALUES = new NumericDocValues() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit. No need for another review.
@@ -223,76 +224,57 @@ private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) th | |||
private NumericDocValues primaryTermDV; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these can all be final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw Sadly no. We sometimes need to reload these DocValues if the targeting docId is smaller than the current docId.
if (seqNoDV.docID() > segmentDocId) {
seqNoDV = leafReader.getNumericDocValues(SeqNoFieldMapper.NAME)
}
Do you have any other idea for this?
Thanks @martijnvg for the great initial work and @bleskes and @s1monw for helpful reviews. |
* es/ccr: (78 commits) Upgrade to Lucene-7.4-snapshot-6705632810 (elastic#30519) add version compatibility from 6.4.0 after backport, see elastic#30319 (elastic#30390) Security: Simplify security index listeners (elastic#30466) Add proper longitude validation in geo_polygon_query (elastic#30497) Remove Discovery.AckListener.onTimeout() (elastic#30514) Build: move generated-resources to build (elastic#30366) Reindex: Fold "with all deps" project into reindex (elastic#30154) Isolate REST client single host tests (elastic#30504) Solve Gradle deprecation warnings around shadowJar (elastic#30483) SAML: Process only signed data (elastic#30420) Remove BWC repository test (elastic#30500) Build: Remove xpack specific run task (elastic#30487) AwaitsFix IntegTestZipClientYamlTestSuiteIT#indices.split tests Enable soft-deletes in v6.4 LLClient: Add setJsonEntity (elastic#30447) [CCR] Read changes from Lucene instead of translog (elastic#30120) Expose CommonStatsFlags directly in IndicesStatsRequest. (elastic#30163) Silence IndexUpgradeIT test failures. (elastic#30430) Bump Gradle heap to 1792m (elastic#30484) [docs] add warning for read-write indices in force merge documentation (elastic#28869) ...
This PR integrates Lucene soft-deletes (LUCENE-8200) into Elasticsearch. Highlight works in this PR include: 1. Replace hard-deletes by soft-deletes in InternalEngine 2. Use _recovery_source if _source is disabled or modified (elastic#31106) 3. Soft-deletes retention policy based on the global checkpoint (elastic#30335) 4. Read operation history from Lucene instead of translog (elastic#30120) 5. Use Lucene history in peer-recovery (elastic#30522) These works have been done by the whole team; however, these individuals (lexical order) have significant contribution in coding and reviewing: Co-authored-by: Adrien Grand <[email protected]> Co-authored-by: Boaz Leskes <[email protected]> Co-authored-by: Jason Tedor <[email protected]> Co-authored-by: Martijn van Groningen <[email protected]> Co-authored-by: Nhat Nguyen <[email protected]> Co-authored-by: Simon Willnauer <[email protected]>
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch. Highlight works in this PR include: - Replace hard-deletes by soft-deletes in InternalEngine - Use _recovery_source if _source is disabled or modified (#31106) - Soft-deletes retention policy based on the global checkpoint (#30335) - Read operation history from Lucene instead of translog (#30120) - Use Lucene history in peer-recovery (#30522) Relates #30086 Closes #29530 --- These works have been done by the whole team; however, these individuals (lexical order) have significant contribution in coding and reviewing: Co-authored-by: Adrien Grand [email protected] Co-authored-by: Boaz Leskes [email protected] Co-authored-by: Jason Tedor [email protected] Co-authored-by: Martijn van Groningen [email protected] Co-authored-by: Nhat Nguyen [email protected] Co-authored-by: Simon Willnauer [email protected]
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch. Highlight works in this PR include: - Replace hard-deletes by soft-deletes in InternalEngine - Use _recovery_source if _source is disabled or modified (elastic#31106) - Soft-deletes retention policy based on the global checkpoint (elastic#30335) - Read operation history from Lucene instead of translog (elastic#30120) - Use Lucene history in peer-recovery (elastic#30522) Relates elastic#30086 Closes elastic#29530 --- These works have been done by the whole team; however, these individuals (lexical order) have significant contribution in coding and reviewing: Co-authored-by: Adrien Grand <[email protected]> Co-authored-by: Boaz Leskes <[email protected]> Co-authored-by: Jason Tedor <[email protected]> Co-authored-by: Martijn van Groningen <[email protected]> Co-authored-by: Nhat Nguyen <[email protected]> Co-authored-by: Simon Willnauer <[email protected]>
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch. Highlight works in this PR include: - Replace hard-deletes by soft-deletes in InternalEngine - Use _recovery_source if _source is disabled or modified (#31106) - Soft-deletes retention policy based on the global checkpoint (#30335) - Read operation history from Lucene instead of translog (#30120) - Use Lucene history in peer-recovery (#30522) Relates #30086 Closes #29530 --- These works have been done by the whole team; however, these individuals (lexical order) have significant contribution in coding and reviewing: Co-authored-by: Adrien Grand <[email protected]> Co-authored-by: Boaz Leskes <[email protected]> Co-authored-by: Jason Tedor <[email protected]> Co-authored-by: Martijn van Groningen <[email protected]> Co-authored-by: Nhat Nguyen <[email protected]> Co-authored-by: Simon Willnauer <[email protected]>
@martijnvg @dnhatn Excuse me, What is the reason of "Read changes from Lucene instead of translog"? |
@weizijun The read patterns of CCR effectively require random access to the translog, which it doesn’t support. Without random access, reading changes was too slow. We could bolt random access on top of the translog, but then it over-complicates the translog. |
This change cuts over from translog to Lucene changes history in CCR component.
The
autoGeneratedIdTimestamp
will be handled in a follow-up.