Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use IndexInput#prefetch for terms dictionary lookups. #13359

Merged
merged 17 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.fst.BytesRefFSTEnum;
import org.apache.lucene.util.fst.BytesRefFSTEnum.InputOutput;
import org.apache.lucene.util.fst.FST;
import org.apache.lucene.util.fst.Util;

Expand Down Expand Up @@ -307,6 +309,28 @@ private boolean setEOF() {
return true;
}

@Override
public void prepareSeekExact(BytesRef target) throws IOException {
if (fr.index == null) {
throw new IllegalStateException("terms index was not loaded");
}

if (fr.size() == 0 || target.compareTo(fr.getMin()) < 0 || target.compareTo(fr.getMax()) > 0) {
return;
}

// TODO: should we try to reuse the current state of this terms enum when applicable?
BytesRefFSTEnum<BytesRef> indexEnum = new BytesRefFSTEnum<>(fr.index);
InputOutput<BytesRef> output = indexEnum.seekFloor(target);
final long code =
fr.readVLongOutput(
new ByteArrayDataInput(
output.output.bytes, output.output.offset, output.output.length));
final long fpSeek = code >>> Lucene90BlockTreeTermsReader.OUTPUT_FLAGS_NUM_BITS;
initIndexInput();
in.prefetch(fpSeek, 1); // TODO: could we know the length of the block?
}

@Override
public boolean seekExact(BytesRef target) throws IOException {

Expand Down
138 changes: 61 additions & 77 deletions lucene/core/src/java/org/apache/lucene/index/TermStates.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.apache.lucene.index;

import java.io.IOException;
import java.util.ArrayList;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.util.ArrayUtil;

/**
* Maintains a {@link IndexReader} {@link TermState} view over {@link IndexReader} instances
Expand Down Expand Up @@ -97,42 +96,26 @@ public static TermStates build(IndexSearcher indexSearcher, Term term, boolean n
assert context != null;
final TermStates perReaderTermState = new TermStates(needsStats ? null : term, context);
if (needsStats) {
TaskExecutor taskExecutor = indexSearcher.getTaskExecutor();
// build the term states concurrently
List<Callable<TermStateInfo>> tasks = new ArrayList<>(context.leaves().size());
TermsEnum[] termsEnums = new TermsEnum[0];
for (LeafReaderContext ctx : context.leaves()) {
tasks.add(
() -> {
TermsEnum termsEnum = loadTermsEnum(ctx, term);
return termsEnum == null
? null
: new TermStateInfo(
termsEnum.termState(),
ctx.ord,
termsEnum.docFreq(),
termsEnum.totalTermFreq());
});
Terms terms = Terms.getTerms(ctx.reader(), term.field());
TermsEnum termsEnum = terms.iterator();
// Schedule the I/O in the terms dictionary in the background.
termsEnum.prepareSeekExact(term.bytes());
termsEnums = ArrayUtil.grow(termsEnums, ctx.ord + 1);
termsEnums[ctx.ord] = termsEnum;
}
List<TermStateInfo> resultInfos = taskExecutor.invokeAll(tasks);
for (TermStateInfo info : resultInfos) {
if (info != null) {
for (int ord = 0; ord < termsEnums.length; ++ord) {
TermsEnum termsEnum = termsEnums[ord];
if (termsEnum != null && termsEnum.seekExact(term.bytes())) {
perReaderTermState.register(
info.getState(), info.getOrdinal(), info.getDocFreq(), info.getTotalTermFreq());
termsEnum.termState(), ord, termsEnum.docFreq(), termsEnum.totalTermFreq());
}
}
}
return perReaderTermState;
}

private static TermsEnum loadTermsEnum(LeafReaderContext ctx, Term term) throws IOException {
final Terms terms = Terms.getTerms(ctx.reader(), term.field());
final TermsEnum termsEnum = terms.iterator();
if (termsEnum.seekExact(term.bytes())) {
return termsEnum;
}
return null;
}

/** Clears the {@link TermStates} internal state and removes all registered {@link TermState}s */
public void clear() {
docFreq = 0;
Expand Down Expand Up @@ -172,22 +155,59 @@ public void accumulateStatistics(final int docFreq, final long totalTermFreq) {
}

/**
* Returns the {@link TermState} for a leaf reader context or <code>null</code> if no {@link
* TermState} for the context was registered.
* Returns a {@link Supplier} for a {@link TermState} for the given {@link LeafReaderContext}.
* This may return {@code null} if some cheap checks help figure out that this term doesn't exist
* in this leaf. The {@link Supplier} may then also return {@code null} if the term doesn't exist.
*
* <p>Calling this method typically schedules some I/O in the background, so it is recommended to
* retrieve {@link Supplier}s across all required terms first before calling {@link Supplier#get}
* on all {@link Supplier}s so that the I/O for these terms can be performed in parallel.
*
* @param ctx the {@link LeafReaderContext} to get the {@link TermState} for.
* @return the {@link TermState} for the given readers ord or <code>null</code> if no {@link
* TermState} for the reader was registered
* @return a Supplier for a TermState.
*/
public TermState get(LeafReaderContext ctx) throws IOException {
public Supplier<TermState> get(LeafReaderContext ctx) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of reminds me of two-phase commit, except at read-time not write-time: we now break up these IO heavy read APIs into two phases, now, where step 1 is the intention to get X soon (allowing prefetch to happen, especially concurrently not just in the background of the calling thread, but, across the N different Xs we want to retrieve). Step 2 is to then go and block on the IO to retrieve each of the N Xs. Two phased reads!

assert ctx.ord >= 0 && ctx.ord < states.length;
if (term == null) return states[ctx.ord];
if (term == null) {
if (states[ctx.ord] == null) {
return null;
} else {
return () -> states[ctx.ord];
}
}
if (this.states[ctx.ord] == null) {
TermsEnum te = loadTermsEnum(ctx, term);
this.states[ctx.ord] = te == null ? EMPTY_TERMSTATE : te.termState();
final Terms terms = ctx.reader().terms(term.field());
if (terms == null) {
return null;
}
final TermsEnum termsEnum = terms.iterator();
termsEnum.prepareSeekExact(term.bytes());
return () -> {
if (this.states[ctx.ord] == null) {
try {
TermState state = null;
if (termsEnum.seekExact(term.bytes())) {
state = termsEnum.termState();
}
this.states[ctx.ord] = state == null ? EMPTY_TERMSTATE : state;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
}
return state;
};
}
if (this.states[ctx.ord] == EMPTY_TERMSTATE) return null;
return this.states[ctx.ord];
return () -> {
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
}
return state;
};
}

/**
Expand Down Expand Up @@ -230,40 +250,4 @@ public String toString() {

return sb.toString();
}

/** Wrapper over TermState, ordinal value, term doc frequency and total term frequency */
private static final class TermStateInfo {
private final TermState state;
private final int ordinal;
private final int docFreq;
private final long totalTermFreq;

/** Initialize TermStateInfo */
public TermStateInfo(TermState state, int ordinal, int docFreq, long totalTermFreq) {
this.state = state;
this.ordinal = ordinal;
this.docFreq = docFreq;
this.totalTermFreq = totalTermFreq;
}

/** Get term state */
public TermState getState() {
return state;
}

/** Get ordinal value */
public int getOrdinal() {
return ordinal;
}

/** Get term doc frequency */
public int getDocFreq() {
return docFreq;
}

/** Get total term frequency */
public long getTotalTermFreq() {
return totalTermFreq;
}
}
}
10 changes: 10 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/TermsEnum.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.lucene.index;

import java.io.IOException;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
Expand Down Expand Up @@ -61,6 +62,15 @@ public enum SeekStatus {
*/
public abstract boolean seekExact(BytesRef text) throws IOException;

/**
* Prepare a future call to {@link #seekExact}. This typically calls {@link IndexInput#prefetch}
* on the right range of bytes under the hood so that the next call to {@link #seekExact} is
* faster. This can be used to parallelize I/O across multiple terms by calling {@link
* #prepareSeekExact} on multiple terms enums before calling {@link #seekExact(BytesRef)} on the
* same {@link TermsEnum}s.
*/
public void prepareSeekExact(BytesRef text) throws IOException {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we look into subclasses such as FilterLeafReader.FilterTermsEnum to make sure this new method behaves correctly?


/**
* Seeks to the specified term, if it exists, or to the next (ceiling) term. Returns SeekStatus to
* indicate whether exact term was found, a different term was found, or EOF was hit. The target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we open a spinoff issue to maybe add prefetch to TermInSetQuery too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, only TermQuery and SynonymQuery are handled so we could open an issue for every other query, I'm not sure we should open these issues? But indeed, let's think of how to take advantage of prefetching in PointRangeQuery, TermInSetQuery, FeatureQuery, etc.

import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -316,7 +317,11 @@ private static TermStates adjustFrequencies(
List<LeafReaderContext> leaves = readerContext.leaves();
TermStates newCtx = new TermStates(readerContext);
for (int i = 0; i < leaves.size(); ++i) {
TermState termState = ctx.get(leaves.get(i));
Supplier<TermState> supplier = ctx.get(leaves.get(i));
if (supplier == null) {
continue;
}
TermState termState = supplier.get();
if (termState == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
Expand Down Expand Up @@ -277,7 +278,8 @@ protected PhraseMatcher getPhraseMatcher(
List<PostingsEnum> postings = new ArrayList<>();

for (Term term : terms) {
TermState termState = termStates.get(term).get(context);
Supplier<TermState> supplier = termStates.get(term).get(context);
TermState termState = supplier == null ? null : supplier.get();
if (termState != null) {
termsEnum.seekExact(term.bytes(), termState);
postings.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.lucene.codecs.lucene99.Lucene99PostingsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99PostingsReader;
import org.apache.lucene.index.ImpactsEnum;
Expand Down Expand Up @@ -501,7 +502,8 @@ protected PhraseMatcher getPhraseMatcher(

for (int i = 0; i < terms.length; i++) {
final Term t = terms[i];
final TermState state = states[i].get(context);
final Supplier<TermState> supplier = states[i].get(context);
final TermState state = supplier == null ? null : supplier.get();
if (state == null) {
/* term doesnt exist in this segment */
assert termNotInReader(reader, t) : "no termstate found but term exists in reader";
Expand Down
Loading