-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Read changes from Lucene instead of translog #30120
Changes from 12 commits
f3a8f95
341eb39
df85c61
59b69e3
1b69093
98ab2ea
f86dc1d
1fe57c0
ce6d8da
974c44c
23b8c51
f2415e7
29a145e
8d8c6b1
2b559b5
2a23d31
3abe88e
b357a54
f34c0d0
09c48ea
f8b74fa
aa1f1c0
c3b0e7a
3b8c63b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,7 @@ | |
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.mapper.IdFieldMapper; | ||
import org.elasticsearch.index.mapper.MapperService; | ||
import org.elasticsearch.index.mapper.ParseContext; | ||
import org.elasticsearch.index.mapper.ParsedDocument; | ||
import org.elasticsearch.index.mapper.SeqNoFieldMapper; | ||
|
@@ -148,6 +149,7 @@ public class InternalEngine extends Engine { | |
private final CounterMetric numDocUpdates = new CounterMetric(); | ||
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField(); | ||
private final boolean softDeleteEnabled; | ||
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; | ||
|
||
/** | ||
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this | ||
|
@@ -224,6 +226,8 @@ public InternalEngine(EngineConfig engineConfig) { | |
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { | ||
this.internalSearcherManager.addListener(listener); | ||
} | ||
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); | ||
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); | ||
success = true; | ||
} finally { | ||
if (success == false) { | ||
|
@@ -2342,6 +2346,18 @@ long getNumDocUpdates() { | |
return numDocUpdates.count(); | ||
} | ||
|
||
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService, | ||
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException { | ||
// TODO: Should we defer the refresh until we really need it? | ||
ensureOpen(); | ||
if (lastRefreshedCheckpoint() < maxSeqNo) { | ||
refresh(source, SearcherScope.INTERNAL); | ||
} | ||
refresh(source, SearcherScope.INTERNAL); | ||
return new LuceneChangesSnapshot(() -> acquireSearcher(source, SearcherScope.INTERNAL), mapperService, | ||
minSeqNo, maxSeqNo, requiredFullRange); | ||
} | ||
|
||
@Override | ||
public boolean isRecovering() { | ||
return pendingTranslogRecovery.get(); | ||
|
@@ -2388,4 +2404,28 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends | |
return super.softUpdateDocuments(term, docs, softDeletes); | ||
} | ||
} | ||
|
||
/** | ||
* Returned the maximum local checkpoint value has been refreshed internally. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the last local checkpoint |
||
*/ | ||
final long lastRefreshedCheckpoint() { | ||
return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); | ||
} | ||
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { | ||
final AtomicLong refreshedCheckpoint; | ||
private long pendingCheckpoint; | ||
LastRefreshedCheckpointListener(long initialLocalCheckpoint) { | ||
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint); | ||
} | ||
@Override | ||
public void beforeRefresh() { | ||
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh | ||
} | ||
@Override | ||
public void afterRefresh(boolean didRefresh) { | ||
if (didRefresh) { | ||
refreshedCheckpoint.getAndUpdate(prev -> Math.max(prev, pendingCheckpoint)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is unsafe? you make capture things that didn't make it into the reader There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only mark seq# as completed after adding its op to Lucene and RefreshListener is notified serially under lock. I think it's safe but we need to discuss to make sure that we won't add something unsafe here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you sure this blocks ongoing indexing? I would definitely double check with @s1monw that we want to rely on this semantics (if it is the case). IMO we should keep it simple and just pre-capture the local checkpoint. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes Yeah, I think I made it too complicated. I replaced |
||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.index.engine; | ||
|
||
import org.apache.lucene.document.LongPoint; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.NumericDocValues; | ||
import org.apache.lucene.index.ReaderUtil; | ||
import org.apache.lucene.index.Term; | ||
import org.apache.lucene.search.DocIdSetIterator; | ||
import org.apache.lucene.search.IndexSearcher; | ||
import org.apache.lucene.search.Query; | ||
import org.apache.lucene.search.ScoreDoc; | ||
import org.apache.lucene.search.Sort; | ||
import org.apache.lucene.search.SortField; | ||
import org.apache.lucene.search.SortedNumericSortField; | ||
import org.apache.lucene.search.TopDocs; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.lucene.Lucene; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.fieldvisitor.FieldsVisitor; | ||
import org.elasticsearch.index.mapper.IdFieldMapper; | ||
import org.elasticsearch.index.mapper.MapperService; | ||
import org.elasticsearch.index.mapper.SeqNoFieldMapper; | ||
import org.elasticsearch.index.mapper.Uid; | ||
import org.elasticsearch.index.mapper.VersionFieldMapper; | ||
import org.elasticsearch.index.translog.Translog; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* A {@link Translog.Snapshot} from changes in a Lucene index | ||
*/ | ||
final class LuceneChangesSnapshot implements Translog.Snapshot { | ||
private final long fromSeqNo, toSeqNo; | ||
private long lastSeenSeqNo; | ||
private int skippedOperations; | ||
private final boolean requiredFullRange; | ||
|
||
private final IndexSearcher searcher; | ||
private final MapperService mapperService; | ||
private int docIndex; | ||
private final TopDocs topDocs; | ||
|
||
private final Closeable onClose; | ||
|
||
/** | ||
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. | ||
* | ||
* @param searcherFactory the engine searcher factory (prefer the internal searcher) | ||
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid | ||
* @param fromSeqNo the min requesting seq# - inclusive | ||
* @param toSeqNo the maximum requesting seq# - inclusive | ||
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo | ||
*/ | ||
LuceneChangesSnapshot(Supplier<Engine.Searcher> searcherFactory, MapperService mapperService, | ||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { | ||
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { | ||
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); | ||
} | ||
this.mapperService = mapperService; | ||
this.fromSeqNo = fromSeqNo; | ||
this.toSeqNo = toSeqNo; | ||
this.lastSeenSeqNo = fromSeqNo - 1; | ||
this.requiredFullRange = requiredFullRange; | ||
boolean success = false; | ||
final Engine.Searcher engineSearcher = searcherFactory.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need supplier? can we just give the searcher as a param? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I think I see why, it's for closing. I think it's still to pass in a search and close it on exception as you did now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. I passed an engine searcher directly. |
||
try { | ||
this.searcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); | ||
this.searcher.setQueryCache(null); | ||
this.topDocs = searchOperations(searcher); | ||
success = true; | ||
this.onClose = engineSearcher; | ||
} finally { | ||
if (success == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be handled on the caller side? We are not responsible for this reference to engine searcher unless fully constructued? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
IOUtils.close(engineSearcher); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
onClose.close(); | ||
} | ||
|
||
@Override | ||
public int totalOperations() { | ||
return Math.toIntExact(topDocs.totalHits); | ||
} | ||
|
||
@Override | ||
public int overriddenOperations() { | ||
return skippedOperations; | ||
} | ||
|
||
@Override | ||
public Translog.Operation next() throws IOException { | ||
final Translog.Operation op = nextOp(); | ||
if (requiredFullRange && lastSeenSeqNo < toSeqNo) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why we to check here that lastSeenSeqNo is < toSeqNo? shouldn't we stop reading before this happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we also want to assert that seqNo != lastSeeSeqNo? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The caller should continue consuming the snapshot until the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused - you check for op==null later on? maybe just put the op!=null check on this outer if? |
||
final long expectedSeqNo = lastSeenSeqNo + 1; | ||
if (op == null || op.seqNo() != expectedSeqNo) { | ||
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " + | ||
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]"); | ||
} | ||
} | ||
if (op != null) { | ||
lastSeenSeqNo = op.seqNo(); | ||
} | ||
return op; | ||
} | ||
|
||
private Translog.Operation nextOp() throws IOException { | ||
final ScoreDoc[] scoreDocs = topDocs.scoreDocs; | ||
for (; docIndex < scoreDocs.length; docIndex++) { | ||
if (scoreDocs[docIndex].doc == DocIdSetIterator.NO_MORE_DOCS) { | ||
return null; | ||
} | ||
final Translog.Operation op = readDocAsOp(scoreDocs[docIndex].doc); | ||
if (op != null) { | ||
return op; | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
private TopDocs searchOperations(IndexSearcher searcher) throws IOException { | ||
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); | ||
final Sort sortedBySeqNoThenByTerm = new Sort( | ||
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), | ||
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed - this should be needed in the future. Maybe we should remove it and instead assert that we never have duplicate seq# There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I miss something here because I think we need it for now but not in the future after we have a Lucene rollback. I will reach out to discuss this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry but I dropped a not in my comment. "this should not be needed in the future." . It's only relevant in cases where the primary dies while indexing is ongoing and we have more than 1 replica. In these cases this primary sort doesn't help because you also need some kind of a deduping mechanism to realy make it work. Such deduping is fairly easy to implement but I'm on the fence to whether we should. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have dedup in this PR already (line 161-163). The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I missed it. I think it's surprising to put it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, we should not mutate anything in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes I moved this to |
||
); | ||
return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm); | ||
} | ||
|
||
private Translog.Operation readDocAsOp(int docID) throws IOException { | ||
final List<LeafReaderContext> leaves = searcher.getIndexReader().leaves(); | ||
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves)); | ||
final int segmentDocID = docID - leaf.docBase; | ||
final long seqNo = readNumericDV(leaf, SeqNoFieldMapper.NAME, segmentDocID); | ||
// This operation has seen and will be skipped anyway - do not visit other fields. | ||
if (seqNo == lastSeenSeqNo) { | ||
skippedOperations++; | ||
return null; | ||
} | ||
|
||
final long primaryTerm = readNumericDV(leaf, SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID); | ||
final FieldsVisitor fields = new FieldsVisitor(true); | ||
searcher.doc(docID, fields); | ||
fields.postProcess(mapperService); | ||
|
||
final Translog.Operation op; | ||
final boolean isTombstone = isTombstoneOperation(leaf, segmentDocID); | ||
if (isTombstone && fields.uid() == null) { | ||
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to say yes? It's very rare and it feels like a good debugging tool. I wonder what other people think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will make it in a follow-up. |
||
assert readNumericDV(leaf, Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 | ||
: "Noop operation but soft_deletes field is not set [" + op + "]"; | ||
} else { | ||
final String id = fields.uid().id(); | ||
final String type = fields.uid().type(); | ||
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); | ||
final long version = readNumericDV(leaf, VersionFieldMapper.NAME, segmentDocID); | ||
if (isTombstone) { | ||
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL); | ||
assert readNumericDV(leaf, Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 | ||
: "Delete operation but soft_deletes field is not set [" + op + "]"; | ||
} else { | ||
final BytesReference source = fields.source(); | ||
// TODO: pass the latest timestamp from engine. | ||
final long autoGeneratedIdTimestamp = -1; | ||
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, | ||
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp); | ||
} | ||
} | ||
return op; | ||
} | ||
|
||
private boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe this should just take a LeafReader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also wonder if we want to pull the |
||
final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can pull all these in the constructor into an array that we can access by index of the leaf reader. this is how we do things in lucene for stuff we access frequently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @s1monw
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I need to reset the DV :) |
||
if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) { | ||
return tombstoneDV.longValue() == 1; | ||
} | ||
return false; | ||
} | ||
|
||
private long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe this should just take a LeafReader? |
||
final NumericDocValues dv = leaf.reader().getNumericDocValues(field); | ||
if (dv == null || dv.advanceExact(segmentDocID) == false) { | ||
throw new IllegalStateException("DocValues for field [" + field + "] is not found"); | ||
} | ||
return dv.longValue(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a mistake?