Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Cancellable DirectoryReader #52822

Merged
merged 32 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e2ebfb4
Implement Cancellable DirectoryReader
matriv Feb 25, 2020
c890142
fix compilation
matriv Feb 26, 2020
e38cfa0
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 26, 2020
73b0e6d
Address comments
matriv Feb 26, 2020
ffdf6d2
Fix behaviour by properly handling cancellable.run()
matriv Feb 26, 2020
add7dd4
split query timeout and cancellation to be able to unset query timeout
matriv Feb 27, 2020
d74edb2
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 27, 2020
331411b
Custom implementation of ExitableDirReader to overcome casting
matriv Feb 27, 2020
d10c51a
revert changes
matriv Feb 27, 2020
e5fdf47
revert
matriv Feb 27, 2020
248ee51
revert unrelated changes
matriv Feb 27, 2020
bc85193
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 27, 2020
ce2d557
Fix issues after splitting to cancelation and timeout
matriv Feb 28, 2020
0012e3a
Add unit test
matriv Feb 28, 2020
4c3183f
re-enable MultiReader Tests, fix cancellation tests
matriv Feb 28, 2020
8b38977
fix checkstyle - enhance comments
matriv Feb 28, 2020
9ebd847
Fix NPE
matriv Feb 28, 2020
0bf64f0
address comments
matriv Feb 29, 2020
3936a05
fix revert
matriv Feb 29, 2020
9bf0fe3
fix revert
matriv Feb 29, 2020
ce51935
move impl to QueryPhase
matriv Feb 29, 2020
9695114
unit test wrapping and exit
matriv Mar 1, 2020
d562cf1
address comments
matriv Mar 3, 2020
6243ced
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 3, 2020
19bdbdf
improve tests
matriv Mar 3, 2020
b446dfd
extract wrapper classes to another file, use more elegant approach fo…
matriv Mar 4, 2020
183da17
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 4, 2020
a912fa3
rename method, add more tests
matriv Mar 4, 2020
23c3adc
replace QueryTimeout with our own iface
matriv Mar 4, 2020
087f2ad
document iface
matriv Mar 4, 2020
df0da4c
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 4, 2020
eb158e5
fix comment
matriv Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class DefaultSearchContext extends SearchContext {
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
FetchPhase fetchPhase) {
FetchPhase fetchPhase) throws IOException {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ && canRewriteToMatchNone(rewritten.source())
}, listener::onFailure));
}

private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) {
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) throws IOException {
// creates a lightweight search context that we use to inform context listeners
// before closing
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
Expand Down Expand Up @@ -609,7 +609,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}
}

final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) {
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) throws IOException {
SearchContext context = createContext(rewriteContext);
onNewContext(context);
boolean success = false;
Expand Down Expand Up @@ -644,7 +644,7 @@ private void onNewContext(SearchContext context) {
}
}

final SearchContext createContext(SearchRewriteContext rewriteContext) {
final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
try {
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
Expand Down Expand Up @@ -695,7 +695,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
}

private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) throws IOException {
boolean success = false;
try {
final ShardSearchRequest request = rewriteContext.request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -77,25 +79,46 @@ public class ContextIndexSearcher extends IndexSearcher {

private AggregatedDfs aggregatedDfs;
private QueryProfiler profiler;
private Runnable checkCancelled;
private MutableQueryTimeout cancellable;

public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
super(reader);
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout());
}

// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped.
// Some issues must be fixed:
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and
// the ShardSearcher sub-searchers.
// - tests that use a MultiReader
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable) throws IOException {
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout();
}

public void setProfiler(QueryProfiler profiler) {
this.profiler = profiler;
}

/**
* Set a {@link Runnable} that will be run on a regular basis while
* collecting documents.
* Add a {@link Runnable} that will be run on a regular basis while accessing documents in the
* DirectoryReader but also while collecting them and check for query cancellation or timeout.
*/
public Runnable addQueryCancellation(Runnable action) {
jimczi marked this conversation as resolved.
Show resolved Hide resolved
return this.cancellable.add(action);
}

/**
* Remove a {@link Runnable} that checks for query cancellation or timeout
* which is called while accessing documents in the DirectoryReader but also while collecting them.
*/
public void setCheckCancelled(Runnable checkCancelled) {
this.checkCancelled = checkCancelled;
public void removeQueryCancellation(Runnable action) {
this.cancellable.remove(action);
}

public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
Expand Down Expand Up @@ -139,12 +162,6 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
}
}

private void checkCancelled() {
if (checkCancelled != null) {
checkCancelled.run();
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
Expand Down Expand Up @@ -180,7 +197,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
* the provided <code>ctx</code>.
*/
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
checkCancelled();
cancellable.checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
try {
Expand Down Expand Up @@ -208,7 +225,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> { } : checkCancelled);
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
Expand All @@ -218,7 +235,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
}

private Weight wrapWeight(Weight weight) {
if (checkCancelled != null) {
if (cancellable.isEnabled()) {
return new Weight(weight.getQuery()) {
@Override
public void extractTerms(Set<Term> terms) {
Expand All @@ -244,7 +261,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
BulkScorer in = weight.bulkScorer(context);
if (in != null) {
return new CancellableBulkScorer(in, checkCancelled);
return new CancellableBulkScorer(in, cancellable::checkCancelled);
} else {
return null;
}
Expand Down Expand Up @@ -320,4 +337,33 @@ public DirectoryReader getDirectoryReader() {
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
return (DirectoryReader) reader;
}

private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation {

private final Set<Runnable> runnables = new HashSet<>();

private Runnable add(Runnable action) {
Objects.requireNonNull(action, "cancellation runnable should not be null");
if (runnables.add(action) == false) {
throw new IllegalArgumentException("Cancellation runnable already added");
}
return action;
}

private void remove(Runnable action) {
runnables.remove(action);
}

@Override
public void checkCancelled() {
for (Runnable timeout : runnables) {
timeout.run();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation feels a bit awkward, I'd rather like to fork ExitableDirectoryReader entirely to not inherit from its QueryTimeout abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in the 1st approach but this means we have to copy the whole ExitablePointValues to wrap the point values and therefore the ExitableIntersectVisitor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind copying it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to copy


@Override
public boolean isEnabled() {
return runnables.isEmpty() == false;
}
}
}
Loading