Skip to content

Commit

Permalink
Integrates soft-deletes into Elasticsearch
Browse files Browse the repository at this point in the history
This PR integrates Lucene soft-deletes (LUCENE-8200) into Elasticsearch.
Highlight works in this PR include:

1. Replace hard-deletes by soft-deletes in InternalEngine
2. Use _recovery_source if _source is disabled or modified (elastic#31106)
3. Soft-deletes retention policy based on the global checkpoint (elastic#30335)
4. Read operation history from Lucene instead of translog (elastic#30120)
5. Use Lucene history in peer-recovery (elastic#30522)

These works have been done by the whole team; however, these individuals
(lexical order) have significant contribution in coding and reviewing:

Co-authored-by: Adrien Grand <[email protected]>
Co-authored-by: Boaz Leskes <[email protected]>
Co-authored-by: Jason Tedor <[email protected]>
Co-authored-by: Martijn van Groningen <[email protected]>
Co-authored-by: Nhat Nguyen <[email protected]>
Co-authored-by: Simon Willnauer <[email protected]>
  • Loading branch information
6 people committed Aug 29, 2018
1 parent 1be3dd5 commit 5ad569a
Show file tree
Hide file tree
Showing 63 changed files with 3,376 additions and 495 deletions.
3 changes: 2 additions & 1 deletion docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ which returns something similar to:
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no": "0"
},
"num_docs" : 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -87,6 +88,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -1109,7 +1111,11 @@ private void duelRun(PercolateQuery.QueryStore queryStore, MemoryIndex memoryInd
}

private void addQuery(Query query, List<ParseContext.Document> docs) {
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document queryDocument = parseContext.doc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -58,6 +59,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand Down Expand Up @@ -182,7 +184,11 @@ public void testExtractTerms() throws Exception {

DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -204,7 +210,7 @@ public void testExtractTerms() throws Exception {
bq.add(termQuery1, Occur.MUST);
bq.add(termQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY, mapperService.documentMapperParser(),
parseContext = new ParseContext.InternalParseContext(settings, mapperService.documentMapperParser(),
documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand Down Expand Up @@ -232,8 +238,12 @@ public void testExtractRanges() throws Exception {
bq.add(rangeQuery2, Occur.MUST);

DocumentMapper documentMapper = mapperService.documentMapper("doc");
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -259,7 +269,7 @@ public void testExtractRanges() throws Exception {
.rangeQuery(15, 20, true, true, null, null, null, null);
bq.add(rangeQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand All @@ -283,7 +293,11 @@ public void testExtractTermsAndRanges_failed() throws Exception {
TermRangeQuery query = new TermRangeQuery("field1", new BytesRef("a"), new BytesRef("z"), true, true);
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -298,7 +312,11 @@ public void testExtractTermsAndRanges_partial() throws Exception {
PhraseQuery phraseQuery = new PhraseQuery("field", "term");
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(phraseQuery, parseContext);
ParseContext.Document document = parseContext.doc();
Expand Down
86 changes: 85 additions & 1 deletion server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
Expand Down Expand Up @@ -96,6 +98,8 @@ public class Lucene {
assert annotation == null : "DocValuesFormat " + LATEST_DOC_VALUES_FORMAT + " is deprecated" ;
}

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

public static final NamedAnalyzer STANDARD_ANALYZER = new NamedAnalyzer("_standard", AnalyzerScope.GLOBAL, new StandardAnalyzer());
public static final NamedAnalyzer KEYWORD_ANALYZER = new NamedAnalyzer("_keyword", AnalyzerScope.GLOBAL, new KeywordAnalyzer());

Expand Down Expand Up @@ -140,7 +144,7 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
public static int getNumDocs(SegmentInfos info) {
int numDocs = 0;
for (SegmentCommitInfo si : info) {
numDocs += si.info.maxDoc() - si.getDelCount();
numDocs += si.info.maxDoc() - si.getDelCount() - si.getSoftDelCount();
}
return numDocs;
}
Expand Down Expand Up @@ -197,6 +201,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
}
final CommitPoint cp = new CommitPoint(si, directory);
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setIndexCommit(cp)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
Expand All @@ -220,6 +225,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException {
}
}
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(NoMergePolicy.INSTANCE) // no merges
.setCommitOnClose(false) // no commits
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append...
Expand Down Expand Up @@ -829,4 +835,82 @@ public int length() {
}
};
}

/**
* Wraps a directory reader to make all documents live except those were rolled back
* or hard-deleted due to non-aborting exceptions during indexing.
* The wrapped reader can be used to query all documents.
*
* @param in the input directory reader
* @return the wrapped reader
*/
public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException {
return new DirectoryReaderWithAllLiveDocs(in);
}

private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
static final class LeafReaderWithLiveDocs extends FilterLeafReader {
final Bits liveDocs;
final int numDocs;
LeafReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) {
super(in);
this.liveDocs = liveDocs;
this.numDocs = numDocs;
}
@Override
public Bits getLiveDocs() {
return liveDocs;
}
@Override
public int numDocs() {
return numDocs;
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader leaf) {
SegmentReader segmentReader = segmentReader(leaf);
Bits hardLiveDocs = segmentReader.getHardLiveDocs();
if (hardLiveDocs == null) {
return new LeafReaderWithLiveDocs(leaf, null, leaf.maxDoc());
}
// TODO: Avoid recalculate numDocs everytime.
int numDocs = 0;
for (int i = 0; i < hardLiveDocs.length(); i++) {
if (hardLiveDocs.get(i)) {
numDocs++;
}
}
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
}
});
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapAllDocsLive(in);
}

@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

/**
* Returns a numeric docvalues which can be used to soft-delete documents.
*/
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -66,15 +67,22 @@ final class PerThreadIDVersionAndSeqNoLookup {
*/
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
this.uidField = uidField;
Terms terms = reader.terms(uidField);
final Terms terms = reader.terms(uidField);
if (terms == null) {
throw new IllegalArgumentException("reader misses the [" + uidField + "] field");
// If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields.
final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD);
final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
if (softDeletesDV == null || tombstoneDV == null) {
throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " +
"_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]");
}
termsEnum = null;
} else {
termsEnum = terms.iterator();
}
termsEnum = terms.iterator();
if (reader.getNumericDocValues(VersionFieldMapper.NAME) == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field; _uid terms [" + terms + "]");
}

Object readerKey = null;
assert (readerKey = reader.getCoreCacheHelper().getKey()) != null;
this.readerKey = readerKey;
Expand Down Expand Up @@ -111,7 +119,8 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
if (termsEnum.seekExact(id)) {
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.MAX_REGEX_LENGTH_SETTING,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
Expand Down
Loading

0 comments on commit 5ad569a

Please sign in to comment.