Skip to content

Commit

Permalink
Use soft-deleted docs to resolve strategy for engine operation (#35230)
Browse files Browse the repository at this point in the history
A CCR test failure shows that the approach in #34474 is flawed.
Restoring the LocalCheckpointTracker from an index commit can cause both
FollowingEngine and InternalEngine to incorrectly ignore some deletes.

Here is a small scenario illustrating the problem:

1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene

2. Flush a commit consisting of only the delete tombstone

3. Index doc with seq=0  => engine will add that doc to Lucene but soft-deleted

4. Restart an engine with the commit (step 2); the engine will fill its
LocalCheckpointTracker with the delete tombstone in the commit

5. Replay the local translog in reverse order: index#0 then delete#1

6. When process index#0, an engine will add it into Lucene as a live doc
and advance the local checkpoint to 1 (seq#1 was restored from the
commit - step 4).

7. When process delete#1, an engine will skip it because seq_no=1 is
less than or equal to the local checkpoint.

We should have zero document after recovering from translog, but here we
have one.

Since all operations after the local checkpoint of the safe commit are
retained, we should find them if the look-up considers also soft-deleted
documents. This PR fills the disparity between the version map and the
local checkpoint tracker by taking soft-deleted documents into account
while resolving strategy for engine operations.

Relates #34474
Relates #33656
  • Loading branch information
dnhatn committed Nov 8, 2018
1 parent 900e711 commit 067e23c
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,36 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context.reader().getLiveDocs());
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
long seqNo;
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
docsEnum = termsEnum.postings(docsEnum, 0);
final Bits liveDocs = context.reader().getLiveDocs();
DocIdAndSeqNo result = null;
int docID = docsEnum.nextDoc();
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
if (isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
assert result == null || result.seqNo <= seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
if (result == null || result.seqNo < seqNo) {
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
}
}
return new DocIdAndSeqNo(docID, seqNo, context);
return result;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -114,11 +112,13 @@ public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;
public final boolean isLive;

DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
this.docId = docId;
this.seqNo = seqNo;
this.context = context;
this.isLive = isLive;
}
}

Expand Down Expand Up @@ -146,41 +146,34 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
}

/**
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and the associated seqNo otherwise
* </ul>
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
* This returns {@code null} if no such document matching the given term uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
final List<LeafReaderContext> leaves = reader.leaves();
DocIdAndSeqNo latest = null;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result != null) {
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result == null) {
continue;
}
if (result.isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
assert latest == null || latest.seqNo <= result.seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
return result;
}
if (latest == null || latest.seqNo < result.seqNo) {
latest = result;
}
}
return null;
}

/**
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
long result;
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
result = primaryTerms.longValue();
} else {
result = 0;
}
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
return result;
return latest;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,31 +679,32 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
if (op.seqNo() > versionValue.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
else {
} else if (op.seqNo() == versionValue.seqNo) {
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
// load from index
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
if (docAndSeqNo.isLive) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
}
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -140,6 +141,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -3712,6 +3714,81 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}

public void testLookupSeqNoByIdInLucene() throws Exception {
int numOps = between(10, 100);
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 50));
boolean isIndexing = randomBoolean();
int copies = frequently() ? 1 : between(2, 4);
for (int c = 0; c < copies; c++) {
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (isIndexing) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
} else {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
}
}
seqNo++;
if (rarely()) {
seqNo++;
}
}
Randomness.shuffle(operations);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
Map<String, Engine.Operation> latestOps = new HashMap<>(); // id -> latest seq_no
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
CheckedRunnable<IOException> lookupAndCheck = () -> {
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
assertThat(msg, docIdAndSeqNo.isLive, equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
}
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.reader(), newUid("any-" + between(1, 10))), nullValue());
Map<String, Long> liveOps = latestOps.entrySet().stream()
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
equalTo(liveOps));
}
};
for (Engine.Operation op : operations) {
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
latestOps.put(op.id(), op);
}
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
latestOps.put(op.id(), op);
}
}
if (randomInt(100) < 10) {
engine.refresh("test");
lookupAndCheck.run();
}
if (rarely()) {
engine.flush();
lookupAndCheck.run();
}
}
engine.refresh("test");
lookupAndCheck.run();
}
}

/**
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
Expand Down Expand Up @@ -4089,7 +4166,11 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
seqNo = docIdAndSeqNo.seqNo;
primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo, get.uid().field());
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTerms == null || primaryTerms.advanceExact(docIdAndSeqNo.docId) == false) {
throw new AssertionError("document does not have primary term [" + docIdAndSeqNo.docId + "]");
}
primaryTerm = primaryTerms.longValue();
}
return new Tuple<>(seqNo, primaryTerm);
} catch (Exception e) {
Expand Down Expand Up @@ -5194,6 +5275,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
Expand All @@ -5216,6 +5298,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
docs = getDocIds(engine, true);
}
trimUnsafeCommits(config);
List<Engine.Operation> safeCommit = null;
Expand All @@ -5232,6 +5315,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
}
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}
Expand Down

0 comments on commit 067e23c

Please sign in to comment.