Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shortcut query phase using the results of other shards #51852

Merged
merged 53 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
28fa23b
Always rewrite search shard request outside of the search thread pool
jimczi Jan 30, 2020
534b552
add serialization test
jimczi Jan 30, 2020
c02f352
iter
jimczi Jan 30, 2020
010ec08
Merge branch 'master' into rewrite_shard_request_no_rejection
jimczi Jan 30, 2020
f5684ec
fix bwc issue
jimczi Jan 30, 2020
0acf244
address review
jimczi Feb 3, 2020
6016fa4
adapt test
jimczi Feb 3, 2020
a058127
fix test
jimczi Feb 3, 2020
8534ed2
fix topNSize when size is reset to 0
jimczi Feb 3, 2020
27cdf19
add more comments
jimczi Feb 3, 2020
a313d1d
Merge branch 'master' into rewrite_shard_request_no_rejection
jimczi Feb 3, 2020
662972c
more review
jimczi Feb 3, 2020
76e90a2
initial commit
jimczi Feb 3, 2020
7eb98fb
Merge branch 'master' into distributed_time_sort
jimczi Feb 3, 2020
ac0451c
iter
jimczi Feb 3, 2020
d0ae658
iter
jimczi Feb 4, 2020
c6747e6
Merge branch 'rewrite_shard_request_no_rejection' into distributed_ti…
jimczi Feb 4, 2020
30b3bcb
iter2
jimczi Feb 4, 2020
961c2cd
remove unrelated change
jimczi Feb 4, 2020
d04a16d
Merge branch 'master' into distributed_time_sort
jimczi Feb 6, 2020
af20421
fix last merge
jimczi Feb 6, 2020
291f742
fix rest test
jimczi Feb 6, 2020
71876e4
another fix
jimczi Feb 6, 2020
b8aa0e2
Merge branch 'master' into distributed_time_sort
jimczi Feb 24, 2020
db4b44f
address review
jimczi Feb 24, 2020
bd927bd
adapt serialization version before backport
jimczi Feb 24, 2020
bf8ebde
address review
jimczi Feb 28, 2020
30173b6
Merge branch 'master' into distributed_time_sort
jimczi Feb 28, 2020
2e19865
fix comparator
jimczi Feb 28, 2020
8eae737
fix topdocs size
jimczi Feb 28, 2020
ea5727e
Merge branch 'master' into distributed_time_sort
jimczi Mar 2, 2020
f50ec90
iter
jimczi Mar 3, 2020
ee2591f
Merge branch 'master' into distributed_time_sort
jimczi Mar 3, 2020
4ec6a6c
fix final merge when some shard responses have their size artifically…
jimczi Mar 3, 2020
09bfb09
check style
jimczi Mar 3, 2020
5e268c6
return topfielddocs if the shard search request rewrites the size to 0
jimczi Mar 3, 2020
2307e73
Merge branch 'master' into distributed_time_sort
jimczi Mar 6, 2020
ae1b278
Replace raw sort values with formatted ones to handle date and date_n…
jimczi Mar 9, 2020
bfd2c72
Merge branch 'master' into distributed_time_sort
jimczi Mar 9, 2020
6489ffe
fix unset of track total hits in distributed search
jimczi Mar 9, 2020
73aa681
fix compilation test after merging with master
jimczi Mar 10, 2020
b2504e6
Merge branch 'master' into distributed_time_sort
jimczi Mar 10, 2020
36bec99
fix partial comment
jimczi Mar 10, 2020
74b8bea
Merge branch 'master' into distributed_time_sort
jimczi Mar 11, 2020
7a54222
iter
jimczi Mar 11, 2020
abda530
Add stricter assertions
jimczi Mar 13, 2020
06cbc19
checkstyle
jimczi Mar 13, 2020
74c5d99
Merge branch 'master' into distributed_time_sort
jimczi Mar 13, 2020
e526147
Merge branch 'master' into distributed_time_sort
jimczi Mar 13, 2020
14542f0
sort values can be integers and floats (_doc and _score)
jimczi Mar 16, 2020
8ad41c2
fix test to handle accepted raw sort format
jimczi Mar 16, 2020
270533c
Merge branch 'master' into distributed_time_sort
jimczi Mar 17, 2020
e4e8b02
fix compil after master merge
jimczi Mar 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
if (request.source().slice() != null) {
throw new IllegalStateException("Can't slice a request that already has a slice configuration");
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
slicedSource = request.source().shallowCopy().slice(sliceBuilder);
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ setup:
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -166,12 +165,11 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- match: { hits.total.value: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -180,5 +178,47 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- match: { hits.total.value: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

# check field sort is correct when skipping query phase
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": "desc"}]

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
jimczi marked this conversation as resolved.
Show resolved Hide resolved
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: { hits.hits.0._id: "3" }

# same with aggs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": "desc"}]
"aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: {hits.hits.0._id: "3" }
- length: { aggregations.idx_terms.buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
protected final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, int topNSize, boolean performFinalReduce) {
QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, int topNSize, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
Expand Down Expand Up @@ -683,6 +683,15 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}

public TopDocs getBufferTopDocs() {
if (hasTopDocs) {
synchronized (this) {
return topDocsBuffer[0];
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
}
}
return null;
}

private synchronized List<InternalAggregations> getRemainingAggs() {
return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,33 @@
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Supplier;

final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO;

class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {

private final SearchPhaseController searchPhaseController;
private final Supplier<TopDocs> topDocsSupplier;
private final int topDocsSize;
private final SearchProgressListener progressListener;

SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
Expand All @@ -49,18 +60,20 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
searchPhaseController.newSearchPhaseResults(task.getProgressListener(), request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.topDocsSize = getTopDocsSize(request);
this.topDocsSupplier = getBufferTopDocsSupplier(request, results);
this.searchPhaseController = searchPhaseController;
this.progressListener = task.getProgressListener();
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
ShardSearchRequest request = rewriteShardRequest(buildShardSearchRequest(shardIt));
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
request, getTask(), listener);
}

@Override
Expand All @@ -72,4 +85,38 @@ protected void onShardGroupFailure(int shardIndex, Exception exc) {
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
}

ShardSearchRequest rewriteShardRequest(ShardSearchRequest request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant to this PR, but In future, do we want to rewrite also requests without sort ( e.g. a keyword search) that can be shortcut?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow. Are you talking of handling queries sorted by _score ? We can probably propagate the global min competitive score up to the query collector so that wouldn't require any rewrite.

TopDocs topDocs = topDocsSupplier.get();
if (topDocs != null && topDocs instanceof TopFieldDocs) {
SearchSourceBuilder source = request.source();
int trackTotalHits = source.trackTotalHitsUpTo() == null ? DEFAULT_TRACK_TOTAL_HITS_UP_TO :
source.trackTotalHitsUpTo();
if (topDocs.totalHits.value >= trackTotalHits) {
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
SearchSourceBuilder newSource = source.shallowCopy();
newSource.trackTotalHits(false);
if (topDocs.scoreDocs.length >= topDocsSize) {
FieldDoc bottomDoc = (FieldDoc) topDocs.scoreDocs[topDocs.scoreDocs.length-1];
request.setRawBottomSortValues(bottomDoc.fields);
}
request.source(newSource);
}
}
return request;
}

private Supplier<TopDocs> getBufferTopDocsSupplier(SearchRequest request,
SearchPhaseResults<SearchPhaseResult> searchPhaseResults) {
if (searchPhaseResults instanceof SearchPhaseController.QueryPhaseResultConsumer == false) {
return () -> null;
}
int size = getTopDocsSize(request);
jimczi marked this conversation as resolved.
Show resolved Hide resolved
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
if (size == 0
|| fieldSort == null
|| fieldSort.canRewriteToMatchNone() == false) {
return () -> null;
}
return ((SearchPhaseController.QueryPhaseResultConsumer) searchPhaseResults)::getBufferTopDocs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -959,13 +959,13 @@ public SearchSourceBuilder rewrite(QueryRewriteContext context) throws IOExcepti
/**
* Create a shallow copy of this builder with a new slice configuration.
*/
public SearchSourceBuilder copyWithNewSlice(SliceBuilder slice) {
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice, sorts, rescoreBuilders, highlightBuilder);
public SearchSourceBuilder shallowCopy() {
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder, sorts, rescoreBuilders, highlightBuilder);
}

/**
* Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@link #sliceBuilder}. Used by
* {@link #rewrite(QueryRewriteContext)} and {@link #copyWithNewSlice(SliceBuilder)}.
* {@link #rewrite(QueryRewriteContext)}}.
*/
@SuppressWarnings("rawtypes")
private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -57,6 +61,8 @@
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED;

/**
* Shard level request that represents a search.
* It provides all the methods that the {@link SearchContext} needs.
Expand All @@ -77,6 +83,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
private final OriginalIndices originalIndices;

private boolean canReturnNullResponseIfMatchNoDocs;
private Object[] rawBottomSortValues;

//these are the only mutable fields, as they are subject to rewriting
private AliasFilter aliasFilter;
Expand Down Expand Up @@ -172,8 +179,10 @@ public ShardSearchRequest(StreamInput in) throws IOException {
preference = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
canReturnNullResponseIfMatchNoDocs = in.readBoolean();
rawBottomSortValues = in.readOptionalArray(Lucene::readSortValue, Object[]::new);
} else {
canReturnNullResponseIfMatchNoDocs = false;
rawBottomSortValues = null;
}
originalIndices = OriginalIndices.readOriginalIndices(in);
}
Expand Down Expand Up @@ -211,6 +220,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(canReturnNullResponseIfMatchNoDocs);
out.writeOptionalArray(Lucene::writeSortValue, rawBottomSortValues);
}
}

Expand Down Expand Up @@ -286,6 +296,20 @@ public String preference() {
return preference;
}

/**
* Sets the bottom sort values that can be used by the searcher to filter documents
* that are after it. This value is computed by coordinating nodes that throttles the
* query phase. After a partial merge of successful shards the sort values of the
* bottom top document are passed as an hint on subsequent shard requests.
*/
public void setRawBottomSortValues(Object[] values) {
this.rawBottomSortValues = values;
}

public Object[] getRawBottomSortValues() {
return rawBottomSortValues;
}

/**
* Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}.
* Defaults to false since the coordinator node needs at least one shard response to build the global
Expand Down Expand Up @@ -343,6 +367,25 @@ static class RequestRewritable implements Rewriteable<Rewriteable> {
public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx);
AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx);

QueryShardContext shardContext = ctx.convertToShardContext();
FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
// checks if the bottom sort values are guaranteed to be more competitive than all the documents
// contained in the shard
if (shardContext != null
&& primarySort != null
&& primarySort.isBottomSortWithinShard(shardContext, request.getRawBottomSortValues()) == false) {
newSource = newSource.shallowCopy();
if (newSource.trackTotalHitsUpTo() == TRACK_TOTAL_HITS_DISABLED
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
&& newSource.aggregations() == null) {
newSource.query(new MatchNoneQueryBuilder());
} else {
newSource.size(0);
}
request.source(newSource);
request.setRawBottomSortValues(null);
}

if (newSource == request.source() && newAliasFilter == request.getAliasFilter()) {
return this;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Objects;
import java.util.function.Function;

import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
import static org.elasticsearch.index.mapper.DateFieldMapper.Resolution.MILLISECONDS;
import static org.elasticsearch.index.mapper.DateFieldMapper.Resolution.NANOSECONDS;
import static org.elasticsearch.index.search.NestedHelper.parentObject;
Expand Down Expand Up @@ -375,6 +376,42 @@ public SortFieldAndFormat build(QueryShardContext context) throws IOException {
return new SortFieldAndFormat(field, fieldType.docValueFormat(null, null));
}

public boolean canRewriteToMatchNone() {
return nestedSort == null && (missing == null || "_last".equals(missing));
}

/**
* Returns whether all values of the given {@link QueryShardContext#getIndexReader()} are within the
* primary sort value provided in the <code>rawBottomSortValues</code>.
*/
public boolean isBottomSortWithinShard(QueryShardContext context,
Object[] rawBottomSortValues) {
if (rawBottomSortValues == null || rawBottomSortValues.length == 0) {
return true;
}

if (canRewriteToMatchNone() == false) {
return true;
}
MappedFieldType fieldType = context.fieldMapper(fieldName);
if (fieldType == null) {
// unmapped
return false;
}
if (fieldType.indexOptions() == IndexOptions.NONE) {
return true;
}
Object minValue = order() == SortOrder.DESC ? rawBottomSortValues[0] : null;
Object maxValue = order() == SortOrder.DESC ? null : rawBottomSortValues[0];
try {
MappedFieldType.Relation relation = fieldType.isFieldWithinQuery(context.getIndexReader(), minValue, maxValue,
true, true, null, DEFAULT_DATE_TIME_FORMATTER.toDateMathParser(), context);
jimczi marked this conversation as resolved.
Show resolved Hide resolved
return relation != MappedFieldType.Relation.DISJOINT;
} catch (Exception exc) {
return true;
}
}

/**
* Return true if the primary sort in the provided <code>source</code>
* is an instance of {@link FieldSortBuilder}.
Expand Down
Loading