Skip to content

Commit

Permalink
Always rewrite search shard request outside of the search thread pool (
Browse files Browse the repository at this point in the history
…elastic#51708)

This change ensures that the rewrite of the shard request is executed in the network thread or in the refresh listener when waiting for an active shard. This allows queries that rewrite to match_no_docs to bypass the search thread pool entirely even if the can_match phase was skipped (pre_filter_shard_size > number of shards). Coordinating nodes don't have the ability to create empty responses so this change also ensures that at least one shard creates a full empty response while the other can return null ones. This is needed since creating true empty responses on shards require to create concrete aggregators which would be too costly to build on a network thread. We should move this functionality to aggregation builders in a follow up but that would be a much bigger change.
This change is also important for elastic#49601 since we want to add the ability to use the result of other shards to rewrite the request of subsequent ones. For instance if the first M shards have their top N computed, the top worst document in the global queue can be pass to subsequent shards that can then rewrite to match_no_docs if they can guarantee that they don't have any document better than the provided one.
  • Loading branch information
jimczi committed Feb 6, 2020
1 parent fb710cc commit 7dbfd85
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,32 @@ setup:
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- length: { aggregations.idx_terms.buckets: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,6 +83,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, Set<String>> indexRoutings;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final SearchTimeProvider timeProvider;
Expand Down Expand Up @@ -467,6 +469,7 @@ private void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
results.consumeResult(result);
hasShardResponse.set(true);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
}
Expand Down Expand Up @@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get());
return shardRequest;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
Expand All @@ -65,6 +66,7 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

public final class SearchPhaseController {

Expand Down Expand Up @@ -427,6 +429,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
int total = queryResults.size();
queryResults = queryResults.stream()
.filter(res -> res.queryResult().isNull() == false)
.collect(Collectors.toList());
String errorMsg = "must have at least one non-empty search result, got 0 out of " + total;
assert queryResults.isEmpty() == false : errorMsg;
if (queryResults.isEmpty()) {
throw new IllegalStateException(errorMsg);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
Expand Down Expand Up @@ -497,6 +508,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/*
* Returns the size of the requested top documents (from + size)
*/
static int getTopDocsSize(SearchRequest request) {
if (request.source() == null) {
return SearchService.DEFAULT_SIZE;
}
SearchSourceBuilder source = request.source();
return (source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size()) +
(source.from() == -1 ? SearchService.DEFAULT_FROM : source.from());
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
Expand Down Expand Up @@ -576,6 +599,7 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
private final SearchProgressListener progressListener;
private int numReducePhases = 0;
private final TopDocsStats topDocsStats;
private final int topNSize;
private final boolean performFinalReduce;

/**
Expand All @@ -589,7 +613,7 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
*/
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, boolean performFinalReduce) {
int trackTotalHitsUpTo, int topNSize, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
Expand All @@ -610,6 +634,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;
this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo);
this.topNSize = topNSize;
this.performFinalReduce = performFinalReduce;
}

Expand All @@ -622,36 +647,38 @@ public void consumeResult(SearchPhaseResult result) {
}

private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (querySearchResult.isNull() == false) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
topNSize, 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
Expand Down Expand Up @@ -706,9 +733,10 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
int topNSize = getTopDocsSize(request);
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, request.isFinalReduce());
trackTotalHitsUpTo, topNSize, request.isFinalReduce());
}
}
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
Expand All @@ -731,7 +759,7 @@ ReducedQueryPhase reduce() {

static final class TopDocsStats {
final int trackTotalHitsUpTo;
private long totalHits;
long totalHits;
private TotalHits.Relation totalHitsRelation;
long fetchHits;
private float maxScore = Float.NEGATIVE_INFINITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop
markSearcherAccessed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
return wrapSearcher(searcher);
}

/**
* Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}.
*/
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;
Expand Down
Loading

0 comments on commit 7dbfd85

Please sign in to comment.