Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily load soft-deletes for searchable snapshot shards #69203

Merged
merged 11 commits into from
Feb 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static class StatsBlockingEngine extends ReadOnlyEngine {
final Semaphore statsBlock = new Semaphore(1);

StatsBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
super(config, null, new TranslogStats(), true, Function.identity(), true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private static class SearcherBlockingEngine extends ReadOnlyEngine {
final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
super(config, null, new TranslogStats(), true, Function.identity(), true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.apache.lucene.index;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.lucene.document.Field;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;

/**
* This is a modified version of {@link SoftDeletesDirectoryReaderWrapper} that materializes the liveDocs
* bitset lazily. In contrast to {@link SoftDeletesDirectoryReaderWrapper}, this wrapper can only be used
* for non-NRT readers.
*
* This reader filters out documents that have a doc values value in the given field and treat these
* documents as soft deleted. Hard deleted documents will also be filtered out in the live docs of this reader.
* @see IndexWriterConfig#setSoftDeletesField(String)
* @see IndexWriter#softUpdateDocument(Term, Iterable, Field...)
* @see SoftDeletesRetentionMergePolicy
*/
public final class LazySoftDeletesDirectoryReaderWrapper extends FilterDirectoryReader {
private final String field;
private final CacheHelper readerCacheHelper;
/**
* Creates a new soft deletes wrapper.
* @param in the incoming directory reader
* @param field the soft deletes field
*/
public LazySoftDeletesDirectoryReaderWrapper(DirectoryReader in, String field) throws IOException {
this(in, new LazySoftDeletesSubReaderWrapper(Collections.emptyMap(), field));
}

private LazySoftDeletesDirectoryReaderWrapper(DirectoryReader in, LazySoftDeletesSubReaderWrapper wrapper) throws IOException {
super(in, wrapper);
this.field = wrapper.field;
readerCacheHelper = in.getReaderCacheHelper() == null ? null : new DelegatingCacheHelper(in.getReaderCacheHelper());
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
Map<CacheKey, LeafReader> readerCache = new HashMap<>();
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
for (LeafReader reader : getSequentialSubReaders()) {
// we try to reuse the live docs instances here if the reader cache key didn't change
if (reader instanceof LazySoftDeletesFilterCodecReader && reader.getReaderCacheHelper() != null) {
readerCache.put(((LazySoftDeletesFilterCodecReader) reader).reader.getReaderCacheHelper().getKey(), reader);
}

}
return new LazySoftDeletesDirectoryReaderWrapper(in, new LazySoftDeletesSubReaderWrapper(readerCache, field));
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}

private static class LazySoftDeletesSubReaderWrapper extends SubReaderWrapper {
private final Map<CacheKey, LeafReader> mapping;
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
private final String field;

LazySoftDeletesSubReaderWrapper(Map<CacheKey, LeafReader> oldReadersCache, String field) {
Objects.requireNonNull(field, "Field must not be null");
assert oldReadersCache != null;
this.mapping = oldReadersCache;
this.field = field;
}

protected LeafReader[] wrap(List<? extends LeafReader> readers) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
List<LeafReader> wrapped = new ArrayList<>(readers.size());
for (LeafReader reader : readers) {
LeafReader wrap = wrap(reader);
assert wrap != null;
if (wrap.numDocs() != 0) {
wrapped.add(wrap);
}
}
return wrapped.toArray(new LeafReader[0]);
}

@Override
public LeafReader wrap(LeafReader reader) {
CacheHelper readerCacheHelper = reader.getReaderCacheHelper();
if (readerCacheHelper != null && mapping.containsKey(readerCacheHelper.getKey())) {
// if the reader cache helper didn't change and we have it in the cache don't bother creating a new one
return mapping.get(readerCacheHelper.getKey());
}
try {
return LazySoftDeletesDirectoryReaderWrapper.wrap(reader, field);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

static LeafReader wrap(LeafReader reader, String field) throws IOException {
final SegmentReader segmentReader = Lucene.segmentReader(reader);
assert segmentReader.isNRT == false : "expected non-NRT reader";
final SegmentCommitInfo segmentInfo = segmentReader.getSegmentInfo();
final int numSoftDeletes = segmentInfo.getSoftDelCount();
if (numSoftDeletes == 0) {
return reader;
}
final int maxDoc = reader.maxDoc();
final int numDocs = maxDoc - segmentInfo.getDelCount() - segmentInfo.getSoftDelCount();
final LazyBits lazyBits = new LazyBits(maxDoc, field, reader, numSoftDeletes, numDocs);
return reader instanceof CodecReader ? new LazySoftDeletesFilterCodecReader((CodecReader) reader, lazyBits, numDocs)
: new LazySoftDeletesFilterLeafReader(reader, lazyBits, numDocs);
}

public static final class LazySoftDeletesFilterLeafReader extends FilterLeafReader {
private final LeafReader reader;
private final LazyBits bits;
private final int numDocs;
private final CacheHelper readerCacheHelper;

public LazySoftDeletesFilterLeafReader(LeafReader reader, LazyBits bits, int numDocs) {
super(reader);
this.reader = reader;
this.bits = bits;
this.numDocs = numDocs;
this.readerCacheHelper = reader.getReaderCacheHelper() == null ? null :
new DelegatingCacheHelper(reader.getReaderCacheHelper());
}

@Override
public LazyBits getLiveDocs() {
return bits;
}

@Override
public int numDocs() {
return numDocs;
}

@Override
public CacheHelper getCoreCacheHelper() {
return reader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}
}

public static final class LazySoftDeletesFilterCodecReader extends FilterCodecReader {
private final LeafReader reader;
private final LazyBits bits;
private final int numDocs;
private final CacheHelper readerCacheHelper;

public LazySoftDeletesFilterCodecReader(CodecReader reader, LazyBits bits, int numDocs) {
super(reader);
this.reader = reader;
this.bits = bits;
this.numDocs = numDocs;
this.readerCacheHelper = reader.getReaderCacheHelper() == null ? null :
new DelegatingCacheHelper(reader.getReaderCacheHelper());
}

@Override
public LazyBits getLiveDocs() {
return bits;
}

@Override
public int numDocs() {
return numDocs;
}

@Override
public CacheHelper getCoreCacheHelper() {
return reader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return readerCacheHelper;
}
}

private static class DelegatingCacheHelper implements CacheHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this bit is the only reason why we need to have this class in the oal.index package. Can we find a way to avoid doing cross-JAR package-protected access? (Is there another option than making the CacheKey ctor public?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other reason except for CacheKey is the call to the PendingSoftDeletes.applySoftDeletes method. That method can easily be reimplemented, however. I don't see a good way to work around the CacheKey constructor not being accessible (safe for introducing even more outrageous hacks), and FWIW we already have some classes like ShuffleForcedMergePolicy and OneMergeHelper in this package.

private final CacheHelper delegate;
private final CacheKey cacheKey = new CacheKey();

DelegatingCacheHelper(CacheHelper delegate) {
this.delegate = delegate;
}

@Override
public CacheKey getKey() {
return cacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
// here we wrap the listener and call it with our cache key
// this is important since this key will be used to cache the reader and otherwise we won't free caches etc.
delegate.addClosedListener(unused -> listener.onClose(cacheKey));
}
}

public static class LazyBits implements Bits {

private final int maxDoc;
private final String field;
private final LeafReader reader;
private final int numSoftDeletes;
private final int numDocs;
volatile Bits materializedBits;

public LazyBits(int maxDoc, String field, LeafReader reader, int numSoftDeletes, int numDocs) {
this.maxDoc = maxDoc;
this.field = field;
this.reader = reader;
this.numSoftDeletes = numSoftDeletes;
this.numDocs = numDocs;
materializedBits = null;
assert numSoftDeletes > 0;
}

@Override
public boolean get(int index) {
if (materializedBits == null) {
synchronized (this) {
try {
if (materializedBits == null) {
materializedBits = init();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
return materializedBits.get(index);
}

@Override
public int length() {
return maxDoc;
}

private Bits init() throws IOException {
assert Thread.holdsLock(this);

DocIdSetIterator iterator = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(field, reader);
assert iterator != null;
Bits liveDocs = reader.getLiveDocs();
final FixedBitSet bits;
if (liveDocs != null) {
bits = FixedBitSet.copyOf(liveDocs);
} else {
bits = new FixedBitSet(maxDoc);
bits.set(0, maxDoc);
}
int numComputedSoftDeletes = PendingSoftDeletes.applySoftDeletes(iterator, bits);
assert numComputedSoftDeletes == numSoftDeletes :
"numComputedSoftDeletes: " + numComputedSoftDeletes + " expected: " + numSoftDeletes;

int numDeletes = reader.numDeletedDocs() + numComputedSoftDeletes;
int computedNumDocs = reader.maxDoc() - numDeletes;
assert computedNumDocs == numDocs : "computedNumDocs: " + computedNumDocs + " expected: " + numDocs;
return bits;
}

public boolean initialized() {
return materializedBits != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
private final DocsStats docsStats;

public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity(), true);
super(config, null, null, true, Function.identity(), true, true);
this.segmentsStats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = openDirectory(directory)) {
Expand Down
Loading