-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use IndexInput#prefetch
for terms dictionary lookups.
#13359
Conversation
This introduces `TermsEnum#prepareSeekExact`, which essentially calls `IndexInput#prefetch` at the right offset for the given term. Then it takes advantage of the fact that `BooleanQuery` already calls `Weight#scorerSupplier` on all clauses, before later calling `ScorerSupplier#get` on all clauses. So `TermQuery` now calls `TermsEnum#prepareSeekExact` on `Weight#scorerSupplier` (if scores are not needed), which in-turn means that the I/O all terms dictionary lookups get parallelized across all term queries of a `BooleanQuery` on a given segment (intra-segment parallelism).
This is a draft as I need to do more work on tests and making sure that this new method cannot corrupt the state of the But I created a benchmark that starts looking like running a Lucene query that is encouraging. It creates an index with many terms that have very short postings lists, so that running boolean queries on these terms is heavy on terms dictionary lookups rather than reading postings. Then it manually runs a disjunction over 3 terms (some of these terms may not exist in the index as they are created randomly), computing how long it takes to evaluate all hits. To work properly when running a query, we'd need to move `#bulkScorer` from `Weight` to `ScorerSupplier`, which I intend to do as a follow-up.import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
public class TermsEnumPrefetchBench {
private static final int NUM_TERMS = 3;
public static int DUMMY;
public static void main(String[] args) throws Exception {
Path dirPath = Paths.get(args[0]);
Directory dir = FSDirectory.open(dirPath);
if (DirectoryReader.indexExists(dir) == false) {
TieredMergePolicy mp = new TieredMergePolicy();
mp.setSegmentsPerTier(100);
mp.setMaxMergeAtOnce(100);
mp.setMaxMergedSegmentMB(1024);
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
.setMergePolicy(mp)
.setRAMBufferSizeMB(1024))) {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
AtomicLong indexed = new AtomicLong(0);
for (int task = 0; task < 1000; ++task) {
executor.execute(() -> {
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 1_000; ++i) {
Document doc = new Document();
for (int j = 0; j < 10_000; ++j) {
doc.add(new StringField("f", Long.toString(r.nextLong(20_000_000_000L)), Store.NO));
}
try {
w.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final long actualIndexed = indexed.incrementAndGet();
if (actualIndexed % 10_000 == 0) {
System.out.println("Indexed: " + actualIndexed);
}
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
w.commit();
System.out.println("Start force merging");
w.forceMerge(1);
System.out.println("Done force merging");
w.commit();
}
}
List<Long> latencies = new ArrayList<>();
try (IndexReader reader = DirectoryReader.open(dir)) {
IndexSearcher searcher = new IndexSearcher(reader);
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 10_000; ++i) {
long start = System.nanoTime();
BooleanQuery.Builder query = new BooleanQuery.Builder();
for (int t = 0; t < NUM_TERMS; ++t) {
query.add(new TermQuery(new Term("f", Long.toString(r.nextLong(20_000_000_000L)))), Occur.SHOULD);
}
Weight weight = searcher.createWeight(searcher.rewrite(query.build()), ScoreMode.COMPLETE_NO_SCORES, 1f);
ScorerSupplier ss = weight.scorerSupplier(reader.leaves().get(0));
if (ss != null) {
Scorer scorer = ss.get(Long.MAX_VALUE);
DocIdSetIterator iter = scorer.iterator();
for (int d = iter.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = iter.nextDoc()) {
DUMMY++;
}
long end = System.nanoTime();
latencies.add((end - start) / 1000);
}
}
}
latencies.sort(null);
System.out.println("P50: " + latencies.get(latencies.size() / 2));
System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10));
System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100));
}
} Without the change: With the change: |
final long fp = in.getFilePointer(); | ||
in.seek(fpSeek); | ||
in.prefetch(1); // TODO: could we know the length of the block? | ||
in.seek(fp); // TODO: do we actually need to do this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't like these calls to seek()
just to prefetch data. Since it is just prefetching, I'd prefer if this "dance" was an impl detail, if needed.
It would make the code simpler to just pass parameter to prefetch rather than do this.
Then it is clear that the default implementation won't cause harm (unnecessary io) for any directory subclasses
So I think prefetch should take location as argument? It is just a hint and not real i/o by the thread. It's intentionally not sequential and sequential API for it only hurts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #13363.
I iterated a bit on this change:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whew terms stuff is always heavy, i left one concern
* #prepareSeekExact} on multiple terms enums before calling {@link #seekExact(BytesRef)} on the | ||
* same {@link TermsEnum}s. | ||
*/ | ||
public void prepareSeekExact(BytesRef text) throws IOException {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we look into subclasses such as FilterLeafReader.FilterTermsEnum
to make sure this new method behaves correctly?
Was this with a forced-cold index? |
It creates a 50GB terms dictionary while my machine only has ~28GB of RAM for the page cache, so many terms dictionary lookups result in page faults. |
// TODO: should we try to reuse the current state of this terms enum when applicable? | ||
BytesRefFSTEnum<BytesRef> indexEnum = new BytesRefFSTEnum<>(fr.index); | ||
InputOutput<BytesRef> output = indexEnum.seekFloor(target); | ||
if (output != null) { // should never be null since we already checked against fr.getMin()? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed -- maybe change to assert
?
output.output.bytes, output.output.offset, output.output.length)); | ||
final long fpSeek = code >>> Lucene90BlockTreeTermsReader.OUTPUT_FLAGS_NUM_BITS; | ||
initIndexInput(); | ||
in.prefetch(fpSeek, 1); // TODO: could we know the length of the block? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do indexEnum.next()
and if that is non-null (it will be null if you are on the very last block -- we could handle that case as well maybe) then get the fp for that next block and subtract the two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this but this doesn't work as expected, it sometimes gives me blocks that have prior offsets and I'm not intimate enough with this terms dictionary to understand why, maybe you do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Egads, that is really weird -- I would not expect those fp
to go backwards on .next()
-- I thought the FST index was a depth-first traversal of all on-disk (leaf) blocks. I will need to mull some more about this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe @vsop-479 has some insight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, I don't think we need to hold up this nice PR for this -- we can try to improve this later. PnP!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it sometimes gives me blocks that have prior offsets
I would not expect those fp to go backwards on .next()
I haven't looked into the whole context.
But this will happens when we finished a Block
and go back to read its parent block's term
.
e.g.
We have terms like: "regular", "request1", "request2", "rest". Set minTermBlockSize to 2, maxTermBlockSize to 3. We will get blocks: b1: ["re"](root), b2: ["gular", "quest", "st"], b3: ["1", "2"].
Then as we call next
, we will get the(term
: fp
) like:
"regular": b2,
"request1": b3,
"request2": b3,
"rest": b2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this will happens when we finished a
Block
and go back to read its parent block'sterm
.
Yeah I agree that the rest
term will be back in b2
block, but, the FSTEnum
we are talking about is the in-memory terms index that holds the file pointer offset to the start of these blocks ... I would have expected the FST to have e.g. in your example:
reg -> b2
req -> b3
rez -> b4
Or so, with b4 > b3 > b2
block file pointers ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! This was nagging at me so I dug into it, printing the FSTEnum
iteration output on a nightly Lucene benchy index ... I now understand why the pointers indeed go backwards. I think this was the point you were making above @vsop-479 -- sorry I misunderstood at first ;)
It's because when writing the blocks we write "bottoms up" on depth first traversal through the terms, and only write a node when it is finished / returned from. Leaf blocks will be written immediately / in order since they are started, terms come out, finished. But for a non-leaf blocks, first all leaf blocks under them are written (in order), and THEN the non-leaf block is written only when we are done with all those recursions and writing any straggler terms that live in the non-leaf block.
But the prefixes are added to the index FST in the correct (term sorted) order. So this means the file pointer can indeed go backwards when iterating the terms index. I'll mull some more about whether we could (efficiently) know the term block length ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I was mistaken, I thought it was SegmentTermsEnum
we are talking about -- sorry about that;)
But for a non-leaf blocks, first all leaf blocks under them are written (in order), and THEN the non-leaf block is written only when we are done with all those recursions and writing any straggler terms that live in the non-leaf block.
This means if we subtract the fp
of a non-leaf block and its next, we will get its sub blocks' total length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for a non-leaf blocks, first all leaf blocks under them are written (in order), and THEN the non-leaf block is written only when we are done with all those recursions and writing any straggler terms that live in the non-leaf block.
This means if we subtract the
fp
of a non-leaf block and its next, we will get its sub blocks' total length?
It's tricky. I think if you do that, you'll get the total length of the next's sub blocks total length? Because each non-leaf block is written at the end of the recursive (depth first) visit of all of its sub blocks.
I'm still not sure how to cleanly/efficiently get the total bytes length of a leaf block by looking solely at the FST terms index. So we should proceed with the hint as is (pre-fetch 1 byte from position X) -- "typically" the terms block will fit into a single IO page (512 or 4096 bytes) and any further readahead the IO system does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jpountz -- this is a nice change -- two phased reads!
BytesRef term; | ||
while ((term = termsEnum.next()) != null) { | ||
|
||
// This is the term vectors: | ||
postings = termsEnum.postings(postings, PostingsEnum.ALL); | ||
assert postings != null; | ||
|
||
if ((seekExactCounter++ & 0xFF) == 0) { | ||
postingsTermsEnum.prepareSeekExact(term); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice -- this is to make sure we are exercising the API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct.
* #prepareSeekExact} on multiple terms enums before calling {@link #seekExact(BytesRef)} on the | ||
* same {@link TermsEnum}s. | ||
* | ||
* <p><b>NOTE</b>: The terms enum is unpositioned after calling this method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm is it really that it is unpositioned, or, that this method does not alter the TermsEnum
's positioned state?
I.e. if I position it to some term, then call this method, won't it still be positioned on that same (prior) term?
Or are we trying to reserve the future right in the API to break the positioning, even though this first impl preserves it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I'm trying to reserve the right to update the state of the terms enum through this API in the future. See also AssertingTermsEnum#prepareSeekExact
.
* | ||
* <p><b>NOTE</b>: It is not necessary to call this method before calling {@link | ||
* #seekExact(BytesRef, TermState)}. {@link TermsEnum} implementations are expected to implement | ||
* this method in an I/O-free fashion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method
-> that method
? Since I think you mean seekExact(BytesRef, TermState)
when you say this method
here (but the previous this method
two lines up is referring to prepareSeekExact
)? Pronouns are hard!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
English as a whole is hard. :) I'll fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm actually I mean prepareSeekExact
when I say this method
. I'll replace this method
with prepareSeekExact
to avoid ambiguity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry I was confused, you meant the second occurrence of "this method"!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL!! Merely communicating about an English sentence, in English, is ESPECIALLY HARD!!
*/ | ||
public TermState get(LeafReaderContext ctx) throws IOException { | ||
public Supplier<TermState> get(LeafReaderContext ctx) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of reminds me of two-phase commit, except at read-time not write-time: we now break up these IO heavy read APIs into two phases, now, where step 1 is the intention to get X soon (allowing prefetch to happen, especially concurrently not just in the background of the calling thread, but, across the N different Xs we want to retrieve). Step 2 is to then go and block on the IO to retrieve each of the N Xs. Two phased reads!
@@ -19,6 +19,7 @@ | |||
import java.io.IOException; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.function.Supplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we open a spinoff issue to maybe add prefetch to TermInSetQuery
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, only TermQuery
and SynonymQuery
are handled so we could open an issue for every other query, I'm not sure we should open these issues? But indeed, let's think of how to take advantage of prefetching in PointRangeQuery
, TermInSetQuery
, FeatureQuery
, etc.
@@ -150,7 +170,12 @@ public Scorer get(long leadCost) throws IOException { | |||
|
|||
@Override | |||
public long cost() { | |||
return docFreq; | |||
try { | |||
TermsEnum te = getTermsEnum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this getter got more costly. It's too bad TermState
is so opaque -- under the hood it (BlockTermState
) is already storing docFreq
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we were already getting a TermsEnum
to be able to get the cost before, it just happened before creating the ScorerSupplier
. So the additional cost here is the if (termsEnum != null)
check under getTermsEnum()
.
Agreed that it's a pity to pull a terms enum only to get a cost, which is already encapsulated in the term state. Though I don't expect it to be a major issue in practice.
This relates to #13359: we want to take advantage of the `Weight#scorerSupplier` call to start scheduling some I/O in the background in parallel across clauses. For this to work properly with top-level disjunctions, we need to move `#bulkScorer()` from `Weight` to `ScorerSupplier` as well, so that the disjunctive `BooleanQuery` first performs a call to `Weight#scorerSupplier()` on all inner clauses, and then `ScorerSupplier#bulkScorer` on all inner clauses. `ScorerSupplier#get` and `ScorerSupplier#bulkScorer` only support being called once. This forced me to fix some inefficiencies in `bulkScorer()` implementations when we would pull scorers and then throw it away when realizing that the strategy we were planning on using was not optimal. This is why e.g. `ReqExclBulkScorer` now also supports prohibited clauses that produce a two-phase iterator.
Now that #13408 has been merged, I could update the benchmark to simply call IndexSearcher#search.import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
public class TermsEnumPrefetchBench {
private static final int NUM_TERMS = 3;
public static int DUMMY;
public static void main(String[] args) throws Exception {
Path dirPath = Paths.get(args[0]);
Directory dir = FSDirectory.open(dirPath);
if (DirectoryReader.indexExists(dir) == false) {
TieredMergePolicy mp = new TieredMergePolicy();
mp.setSegmentsPerTier(100);
mp.setMaxMergeAtOnce(100);
mp.setMaxMergedSegmentMB(1024);
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
.setMergePolicy(mp)
.setRAMBufferSizeMB(1024))) {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
AtomicLong indexed = new AtomicLong(0);
for (int task = 0; task < 1000; ++task) {
executor.execute(() -> {
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 1_000; ++i) {
Document doc = new Document();
for (int j = 0; j < 10_000; ++j) {
doc.add(new StringField("f", Long.toString(r.nextLong(20_000_000_000L)), Store.NO));
}
try {
w.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final long actualIndexed = indexed.incrementAndGet();
if (actualIndexed % 10_000 == 0) {
System.out.println("Indexed: " + actualIndexed);
}
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
w.commit();
System.out.println("Start force merging");
w.forceMerge(1);
System.out.println("Done force merging");
w.commit();
}
}
List<Long> latencies = new ArrayList<>();
try (IndexReader reader = DirectoryReader.open(dir)) {
IndexSearcher searcher = new IndexSearcher(reader);
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 10_000; ++i) {
long start = System.nanoTime();
BooleanQuery.Builder query = new BooleanQuery.Builder();
for (int t = 0; t < NUM_TERMS; ++t) {
query.add(new TermQuery(new Term("f", Long.toString(r.nextLong(20_000_000_000L)))), Occur.SHOULD);
}
DUMMY += searcher.search(query.build(), 1, Sort.INDEXORDER).totalHits.value;
long end = System.nanoTime();
latencies.add((end - start) / 1000);
}
}
latencies.sort(null);
System.out.println("P50: " + latencies.get(latencies.size() / 2));
System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10));
System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100));
}
} Results still look good. Before the change: After the change: |
This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the [email protected] list. Thank you for your contribution! |
I pushed a new approach. Instead of
The benchmark still reports similar numbers: Without the change
With the change
|
I will merge soon if there are no objections. |
This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the [email protected] list. Thank you for your contribution! |
In apache/lucene#13359 a new "prepareSeekExact" method was added that can improve seeking on TermsEnum implementations. Two of our own subclasses of TermsEnum don't seem to support seeking for text, so we can safely throw an UOE there. The third (FilterableTermsEnum) changes to simple returning a Supplier for the actual "seek" method for now.
This introduces
TermsEnum#prepareSeekExact
, which essentially callsIndexInput#prefetch
at the right offset for the given term. Then it takes advantage of the fact thatBooleanQuery
already callsWeight#scorerSupplier
on all clauses, before later callingScorerSupplier#get
on all clauses. SoTermQuery
now callsTermsEnum#prepareSeekExact
onWeight#scorerSupplier
(if scores are not needed), which in-turn means that the I/O all terms dictionary lookups get parallelized across all term queries of aBooleanQuery
on a given segment (intra-segment parallelism).