Skip to content

Commit

Permalink
Misc cleanups for query phase (elastic#112521)
Browse files Browse the repository at this point in the history
Avoiding some redundant computation in obvious spots, fixing compile
warnings and using a more specific listener in one spot to save memory
and indirection.
  • Loading branch information
original-brownbear authored and cbuescher committed Sep 4, 2024
1 parent 4fb260d commit 413ba7a
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
}

@Override
@SuppressWarnings("rawtypes")
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
in.executeAsyncActions(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public InnerHitsRewriteContext convertToInnerHitsRewriteContext() {
}

@Override
@SuppressWarnings({ "rawtypes" })
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
// InnerHitsRewriteContext does not support async actions at all, and doesn't supply a valid `client` object
throw new UnsupportedOperationException("InnerHitsRewriteContext does not support async actions");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,12 @@ public boolean hasAsyncActions() {
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
* <code>null</code>. The list of registered actions is cleared once this method returns.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
if (asyncActions.isEmpty()) {
listener.onResponse(null);
} else {
CountDown countDown = new CountDown(asyncActions.size());
ActionListener<?> internalListener = new ActionListener() {
ActionListener<?> internalListener = new ActionListener<>() {
@Override
public void onResponse(Object o) {
if (countDown.countDown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,7 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
}

@Override
@SuppressWarnings("rawtypes")
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
failIfFrozen();
super.executeAsyncActions(listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,6 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
int maximumNumberOfSlices = determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
Expand All @@ -202,7 +195,13 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
maximumNumberOfSlices,
determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
),
minimumDocsPerSlice
);
}
Expand Down
31 changes: 14 additions & 17 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1779,14 +1779,12 @@ private static boolean canMatchAfterRewrite(final ShardSearchRequest request, fi
@SuppressWarnings("unchecked")
public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException {
Rewriteable.rewrite(request.getRewriteable(), context, false);
boolean canMatch = request.getAliasFilter().getQueryBuilder() instanceof MatchNoneQueryBuilder == false;
if (canRewriteToMatchNone(request.source())) {
canMatch &= request.source()
.subSearches()
.stream()
.anyMatch(sqwb -> sqwb.getQueryBuilder() instanceof MatchNoneQueryBuilder == false);
if (request.getAliasFilter().getQueryBuilder() instanceof MatchNoneQueryBuilder) {
return false;
}
return canMatch;
final var source = request.source();
return canRewriteToMatchNone(source) == false
|| source.subSearches().stream().anyMatch(sqwb -> sqwb.getQueryBuilder() instanceof MatchNoneQueryBuilder == false);
}

/**
Expand All @@ -1806,19 +1804,18 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
ActionListener<Rewriteable> actionListener = listener.delegateFailureAndWrap((l, r) -> {
if (request.readerId() != null) {
l.onResponse(request);
} else {
shard.ensureShardSearchActive(b -> l.onResponse(request));
}
});
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here.
// AliasFilters and other things may need to be rewritten on the data node, but not per individual shard.
// These are uncommon-cases but we are very efficient doing the rewrite here.
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getDataRewriteContext(request::nowInMillis), actionListener);
// These are uncommon-cases, but we are very efficient doing the rewrite here.
Rewriteable.rewriteAndFetch(
request.getRewriteable(),
indicesService.getDataRewriteContext(request::nowInMillis),
request.readerId() == null
? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(b -> l.onResponse(request)))
: listener.safeMap(r -> request)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,22 +587,23 @@ public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx);
AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx);
SearchExecutionContext searchExecutionContext = ctx.convertToSearchExecutionContext();
FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
if (searchExecutionContext != null
&& primarySort != null
&& primarySort.isBottomSortShardDisjoint(searchExecutionContext, request.getBottomSortValues())) {
assert newSource != null : "source should contain a primary sort field";
newSource = newSource.shallowCopy();
int trackTotalHitsUpTo = SearchRequest.resolveTrackTotalHitsUpTo(request.scroll, request.source);
if (trackTotalHitsUpTo == TRACK_TOTAL_HITS_DISABLED && newSource.suggest() == null && newSource.aggregations() == null) {
newSource.query(new MatchNoneQueryBuilder());
} else {
newSource.size(0);
if (searchExecutionContext != null) {
final FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
if (primarySort != null && primarySort.isBottomSortShardDisjoint(searchExecutionContext, request.getBottomSortValues())) {
assert newSource != null : "source should contain a primary sort field";
newSource = newSource.shallowCopy();
int trackTotalHitsUpTo = SearchRequest.resolveTrackTotalHitsUpTo(request.scroll, request.source);
if (trackTotalHitsUpTo == TRACK_TOTAL_HITS_DISABLED
&& newSource.suggest() == null
&& newSource.aggregations() == null) {
newSource.query(new MatchNoneQueryBuilder());
} else {
newSource.size(0);
}
request.source(newSource);
request.setBottomSortValues(null);
}
request.source(newSource);
request.setBottomSortValues(null);
}

if (newSource == request.source() && newAliasFilter == request.getAliasFilter()) {
return this;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ public void testRewritePipelineAggregationUnderAggregation() throws Exception {
QueryRewriteContext context = new QueryRewriteContext(parserConfig(), null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
context.executeAsyncActions(new ActionListener<>() {
@Override
public void onResponse(Object response) {
public void onResponse(Void aVoid) {
assertNotSame(builder, rewritten);
Collection<AggregationBuilder> aggregatorFactories = rewritten.getAggregatorFactories();
assertEquals(1, aggregatorFactories.size());
Expand All @@ -289,9 +289,9 @@ public void testRewriteAggregationAtTopLevel() throws Exception {
QueryRewriteContext context = new QueryRewriteContext(parserConfig(), null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
context.executeAsyncActions(new ActionListener<>() {
@Override
public void onResponse(Object response) {
public void onResponse(Void aVoid) {
assertNotSame(builder, rewritten);
PipelineAggregationBuilder rewrittenPipeline = rewritten.getPipelineAggregatorFactories().iterator().next();
assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten"));
Expand Down

0 comments on commit 413ba7a

Please sign in to comment.