Skip to content

Commit

Permalink
Lazily load soft-deletes for searchable snapshot shards (#69203)
Browse files Browse the repository at this point in the history
Opening a Lucene index that supports soft-deletes currently creates the liveDocs bitset eagerly. This requires scanning
the doc values to materialize the liveDocs bitset from the soft-delete doc values. In order for searchable snapshot shards
to be available for searches as quickly as possible (i.e. on recovery, or in case of FrozenEngine whenever a search comes
in), they should read as little as possible from the Lucene files.

This commit introduces a LazySoftDeletesDirectoryReaderWrapper, a variant of Lucene's
SoftDeletesDirectoryReaderWrapper that loads the livedocs bitset lazily on first access. It is special-tailored to
ReadOnlyEngine / FrozenEngine as it only operates on non-NRT readers.
  • Loading branch information
ywelsch authored Feb 22, 2021
1 parent c86d9e3 commit f2a1e02
Show file tree
Hide file tree
Showing 18 changed files with 569 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static class StatsBlockingEngine extends ReadOnlyEngine {
final Semaphore statsBlock = new Semaphore(1);

StatsBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
super(config, null, new TranslogStats(), true, Function.identity(), true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private static class SearcherBlockingEngine extends ReadOnlyEngine {
final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
super(config, null, new TranslogStats(), true, Function.identity(), true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.apache.lucene.index;

import org.apache.lucene.document.Field;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* This is a modified version of {@link SoftDeletesDirectoryReaderWrapper} that materializes the liveDocs
* bitset lazily. In contrast to {@link SoftDeletesDirectoryReaderWrapper}, this wrapper can only be used
* for non-NRT readers.
*
* This reader filters out documents that have a doc values value in the given field and treat these
* documents as soft deleted. Hard deleted documents will also be filtered out in the live docs of this reader.
* @see IndexWriterConfig#setSoftDeletesField(String)
* @see IndexWriter#softUpdateDocument(Term, Iterable, Field...)
* @see SoftDeletesRetentionMergePolicy
*/
public final class LazySoftDeletesDirectoryReaderWrapper extends FilterDirectoryReader {
private final CacheHelper readerCacheHelper;
/**
* Creates a new soft deletes wrapper.
* @param in the incoming directory reader
* @param field the soft deletes field
*/
public LazySoftDeletesDirectoryReaderWrapper(DirectoryReader in, String field) throws IOException {
super(in, new LazySoftDeletesSubReaderWrapper(field));
readerCacheHelper = in.getReaderCacheHelper() == null ? null : new DelegatingCacheHelper(in.getReaderCacheHelper());
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}

private static class LazySoftDeletesSubReaderWrapper extends SubReaderWrapper {
private final String field;

LazySoftDeletesSubReaderWrapper(String field) {
Objects.requireNonNull(field, "Field must not be null");
this.field = field;
}

@Override
protected LeafReader[] wrap(List<? extends LeafReader> readers) {
List<LeafReader> wrapped = new ArrayList<>(readers.size());
for (LeafReader reader : readers) {
LeafReader wrap = wrap(reader);
assert wrap != null;
if (wrap.numDocs() != 0) {
wrapped.add(wrap);
}
}
return wrapped.toArray(new LeafReader[0]);
}

@Override
public LeafReader wrap(LeafReader reader) {
return LazySoftDeletesDirectoryReaderWrapper.wrap(reader, field);
}
}

static LeafReader wrap(LeafReader reader, String field) {
final SegmentReader segmentReader = Lucene.segmentReader(reader);
assert segmentReader.isNRT == false : "expected non-NRT reader";
final SegmentCommitInfo segmentInfo = segmentReader.getSegmentInfo();
final int numSoftDeletes = segmentInfo.getSoftDelCount();
if (numSoftDeletes == 0) {
return reader;
}
final int maxDoc = reader.maxDoc();
final int numDocs = maxDoc - segmentInfo.getDelCount() - segmentInfo.getSoftDelCount();
final LazyBits lazyBits = new LazyBits(maxDoc, field, reader, numSoftDeletes, numDocs);
return reader instanceof CodecReader ? new LazySoftDeletesFilterCodecReader((CodecReader) reader, lazyBits, numDocs)
: new LazySoftDeletesFilterLeafReader(reader, lazyBits, numDocs);
}

public static final class LazySoftDeletesFilterLeafReader extends FilterLeafReader {
private final LeafReader reader;
private final LazyBits bits;
private final int numDocs;
private final CacheHelper readerCacheHelper;

public LazySoftDeletesFilterLeafReader(LeafReader reader, LazyBits bits, int numDocs) {
super(reader);
this.reader = reader;
this.bits = bits;
this.numDocs = numDocs;
this.readerCacheHelper = reader.getReaderCacheHelper() == null ? null :
new DelegatingCacheHelper(reader.getReaderCacheHelper());
}

@Override
public LazyBits getLiveDocs() {
return bits;
}

@Override
public int numDocs() {
return numDocs;
}

@Override
public CacheHelper getCoreCacheHelper() {
return reader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}
}

public static final class LazySoftDeletesFilterCodecReader extends FilterCodecReader {
private final LeafReader reader;
private final LazyBits bits;
private final int numDocs;
private final CacheHelper readerCacheHelper;

public LazySoftDeletesFilterCodecReader(CodecReader reader, LazyBits bits, int numDocs) {
super(reader);
this.reader = reader;
this.bits = bits;
this.numDocs = numDocs;
this.readerCacheHelper = reader.getReaderCacheHelper() == null ? null :
new DelegatingCacheHelper(reader.getReaderCacheHelper());
}

@Override
public LazyBits getLiveDocs() {
return bits;
}

@Override
public int numDocs() {
return numDocs;
}

@Override
public CacheHelper getCoreCacheHelper() {
return reader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}
}

private static class DelegatingCacheHelper implements CacheHelper {
private final CacheHelper delegate;
private final CacheKey cacheKey = new CacheKey();

DelegatingCacheHelper(CacheHelper delegate) {
this.delegate = delegate;
}

@Override
public CacheKey getKey() {
return cacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
// here we wrap the listener and call it with our cache key
// this is important since this key will be used to cache the reader and otherwise we won't free caches etc.
delegate.addClosedListener(unused -> listener.onClose(cacheKey));
}
}

public static class LazyBits implements Bits {

private final int maxDoc;
private final String field;
private final LeafReader reader;
private final int numSoftDeletes;
private final int numDocs;
volatile Bits materializedBits;

public LazyBits(int maxDoc, String field, LeafReader reader, int numSoftDeletes, int numDocs) {
this.maxDoc = maxDoc;
this.field = field;
this.reader = reader;
this.numSoftDeletes = numSoftDeletes;
this.numDocs = numDocs;
materializedBits = null;
assert numSoftDeletes > 0;
}

@Override
public boolean get(int index) {
if (materializedBits == null) {
synchronized (this) {
try {
if (materializedBits == null) {
materializedBits = init();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
return materializedBits.get(index);
}

@Override
public int length() {
return maxDoc;
}

private Bits init() throws IOException {
assert Thread.holdsLock(this);

DocIdSetIterator iterator = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(field, reader);
assert iterator != null;
Bits liveDocs = reader.getLiveDocs();
final FixedBitSet bits;
if (liveDocs != null) {
bits = FixedBitSet.copyOf(liveDocs);
} else {
bits = new FixedBitSet(maxDoc);
bits.set(0, maxDoc);
}
int numComputedSoftDeletes = PendingSoftDeletes.applySoftDeletes(iterator, bits);
assert numComputedSoftDeletes == numSoftDeletes :
"numComputedSoftDeletes: " + numComputedSoftDeletes + " expected: " + numSoftDeletes;

int numDeletes = reader.numDeletedDocs() + numComputedSoftDeletes;
int computedNumDocs = reader.maxDoc() - numDeletes;
assert computedNumDocs == numDocs : "computedNumDocs: " + computedNumDocs + " expected: " + numDocs;
return bits;
}

public boolean initialized() {
return materializedBits != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
private final DocsStats docsStats;

public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity(), true);
super(config, null, null, true, Function.identity(), true, true);
this.segmentsStats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = openDirectory(directory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LazySoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
Expand Down Expand Up @@ -52,7 +53,7 @@
* Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write
* engine.
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean)
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean, boolean)
*/
public class ReadOnlyEngine extends Engine {

Expand All @@ -69,6 +70,7 @@ public class ReadOnlyEngine extends Engine {
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
final boolean lazilyLoadSoftDeletes;

protected volatile TranslogStats translogStats;
protected final String commitId;
Expand All @@ -85,9 +87,11 @@ public class ReadOnlyEngine extends Engine {
* the lock won't be obtained
* @param readerWrapperFunction allows to wrap the index-reader for this engine.
* @param requireCompleteHistory indicates whether this engine permits an incomplete history (i.e. LCP &lt; MSN)
* @param lazilyLoadSoftDeletes indicates whether this engine should load the soft-delete based liveDocs eagerly, or on first access
*/
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction, boolean requireCompleteHistory) {
Function<DirectoryReader, DirectoryReader> readerWrapperFunction, boolean requireCompleteHistory,
boolean lazilyLoadSoftDeletes) {
super(config);
this.refreshListener = new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService());
this.requireCompleteHistory = requireCompleteHistory;
Expand All @@ -110,6 +114,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
this.lazilyLoadSoftDeletes = lazilyLoadSoftDeletes;
reader = wrapReader(open(indexCommit), readerWrapperFunction);
readerManager = new ElasticsearchReaderManager(reader, refreshListener);
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
Expand Down Expand Up @@ -194,7 +199,11 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,

protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
if (lazilyLoadSoftDeletes) {
return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
} else {
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
}
}

@Override
Expand Down Expand Up @@ -512,10 +521,14 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}

protected static DirectoryReader openDirectory(Directory directory) throws IOException {
protected DirectoryReader openDirectory(Directory directory) throws IOException {
assert Transports.assertNotTransportThread("opening directory reader of a read-only engine");
final DirectoryReader reader = DirectoryReader.open(directory);
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
if (lazilyLoadSoftDeletes) {
return new LazySoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
} else {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3541,7 +3541,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity(), true) {
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity(), true,
false) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
Expand Down
Loading

0 comments on commit f2a1e02

Please sign in to comment.