Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shortcut query phase using the results of other shards #51852

Merged
merged 53 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
28fa23b
Always rewrite search shard request outside of the search thread pool
jimczi Jan 30, 2020
534b552
add serialization test
jimczi Jan 30, 2020
c02f352
iter
jimczi Jan 30, 2020
010ec08
Merge branch 'master' into rewrite_shard_request_no_rejection
jimczi Jan 30, 2020
f5684ec
fix bwc issue
jimczi Jan 30, 2020
0acf244
address review
jimczi Feb 3, 2020
6016fa4
adapt test
jimczi Feb 3, 2020
a058127
fix test
jimczi Feb 3, 2020
8534ed2
fix topNSize when size is reset to 0
jimczi Feb 3, 2020
27cdf19
add more comments
jimczi Feb 3, 2020
a313d1d
Merge branch 'master' into rewrite_shard_request_no_rejection
jimczi Feb 3, 2020
662972c
more review
jimczi Feb 3, 2020
76e90a2
initial commit
jimczi Feb 3, 2020
7eb98fb
Merge branch 'master' into distributed_time_sort
jimczi Feb 3, 2020
ac0451c
iter
jimczi Feb 3, 2020
d0ae658
iter
jimczi Feb 4, 2020
c6747e6
Merge branch 'rewrite_shard_request_no_rejection' into distributed_ti…
jimczi Feb 4, 2020
30b3bcb
iter2
jimczi Feb 4, 2020
961c2cd
remove unrelated change
jimczi Feb 4, 2020
d04a16d
Merge branch 'master' into distributed_time_sort
jimczi Feb 6, 2020
af20421
fix last merge
jimczi Feb 6, 2020
291f742
fix rest test
jimczi Feb 6, 2020
71876e4
another fix
jimczi Feb 6, 2020
b8aa0e2
Merge branch 'master' into distributed_time_sort
jimczi Feb 24, 2020
db4b44f
address review
jimczi Feb 24, 2020
bd927bd
adapt serialization version before backport
jimczi Feb 24, 2020
bf8ebde
address review
jimczi Feb 28, 2020
30173b6
Merge branch 'master' into distributed_time_sort
jimczi Feb 28, 2020
2e19865
fix comparator
jimczi Feb 28, 2020
8eae737
fix topdocs size
jimczi Feb 28, 2020
ea5727e
Merge branch 'master' into distributed_time_sort
jimczi Mar 2, 2020
f50ec90
iter
jimczi Mar 3, 2020
ee2591f
Merge branch 'master' into distributed_time_sort
jimczi Mar 3, 2020
4ec6a6c
fix final merge when some shard responses have their size artifically…
jimczi Mar 3, 2020
09bfb09
check style
jimczi Mar 3, 2020
5e268c6
return topfielddocs if the shard search request rewrites the size to 0
jimczi Mar 3, 2020
2307e73
Merge branch 'master' into distributed_time_sort
jimczi Mar 6, 2020
ae1b278
Replace raw sort values with formatted ones to handle date and date_n…
jimczi Mar 9, 2020
bfd2c72
Merge branch 'master' into distributed_time_sort
jimczi Mar 9, 2020
6489ffe
fix unset of track total hits in distributed search
jimczi Mar 9, 2020
73aa681
fix compilation test after merging with master
jimczi Mar 10, 2020
b2504e6
Merge branch 'master' into distributed_time_sort
jimczi Mar 10, 2020
36bec99
fix partial comment
jimczi Mar 10, 2020
74b8bea
Merge branch 'master' into distributed_time_sort
jimczi Mar 11, 2020
7a54222
iter
jimczi Mar 11, 2020
abda530
Add stricter assertions
jimczi Mar 13, 2020
06cbc19
checkstyle
jimczi Mar 13, 2020
74c5d99
Merge branch 'master' into distributed_time_sort
jimczi Mar 13, 2020
e526147
Merge branch 'master' into distributed_time_sort
jimczi Mar 13, 2020
14542f0
sort values can be integers and floats (_doc and _score)
jimczi Mar 16, 2020
8ad41c2
fix test to handle accepted raw sort format
jimczi Mar 16, 2020
270533c
Merge branch 'master' into distributed_time_sort
jimczi Mar 17, 2020
e4e8b02
fix compil after master merge
jimczi Mar 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
if (request.source().slice() != null) {
throw new IllegalStateException("Can't slice a request that already has a slice configuration");
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
slicedSource = request.source().shallowCopy().slice(sliceBuilder);
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ setup:
mappings:
properties:
created_at:
type: date
type: date_nanos
format: "yyyy-MM-dd"
- do:
indices.create:
Expand Down Expand Up @@ -154,10 +154,9 @@ setup:
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -166,12 +165,11 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- match: { hits.total.value: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
Expand All @@ -180,5 +178,47 @@ setup:
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- match: { hits.total.value: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

# check field sort is correct when skipping query phase
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }]

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
jimczi marked this conversation as resolved.
Show resolved Hide resolved
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: { hits.hits.0._id: "3" }

# same with aggs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc", "numeric_type": "date" } }]
"aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: {hits.hits.0._id: "3" }
- length: { aggregations.idx_terms.buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand Down Expand Up @@ -467,7 +467,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg
* @param result the result returned form the shard
* @param shardIt the shard iterator
*/
private void onShardResult(Result result, SearchShardIterator shardIt) {
protected void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValuesAndFormats;

/**
* Utility class to keep track of the bottom doc's sort values in a distributed search.
*/
class BottomSortValuesCollector {
private final int topNSize;
private final SortField[] sortFields;
private final FieldComparator[] comparators;
private final int[] reverseMuls;

private volatile long totalHits;
private volatile SearchSortValuesAndFormats bottomSortValues;

BottomSortValuesCollector(int topNSize, SortField[] sortFields) {
this.topNSize = topNSize;
this.comparators = new FieldComparator[sortFields.length];
this.reverseMuls = new int[sortFields.length];
this.sortFields = sortFields;
for (int i = 0; i < sortFields.length; i++) {
comparators[i] = sortFields[i].getComparator(1, i);
reverseMuls[i] = sortFields[i].getReverse() ? -1 : 1;
}
}

long getTotalHits() {
return totalHits;
}

/**
* @return The best bottom sort values consumed so far.
*/
SearchSortValuesAndFormats getBottomSortValues() {
return bottomSortValues;
}

synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
totalHits += topDocs.totalHits.value;
if (validateShardSortFields(topDocs.fields) == false) {
return;
}

FieldDoc shardBottomDoc = extractBottom(topDocs);
if (shardBottomDoc == null) {
return;
}
if (bottomSortValues == null
|| compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
}
}

/**
* @return <code>false</code> if the provided {@link SortField} array differs
* from the initial {@link BottomSortValuesCollector#sortFields}.
*/
private boolean validateShardSortFields(SortField[] shardSortFields) {
for (int i = 0; i < shardSortFields.length; i++) {
if (shardSortFields[i].equals(sortFields[i]) == false) {
// ignore shards response that would make the sort incompatible
// (e.g.: mixing keyword/numeric or long/double).
// TODO: we should fail the entire request because the topdocs
// merge will likely fail later but this is not possible with
// the current async logic that only allows shard failures here.
return false;
}
}
return true;
}

private FieldDoc extractBottom(TopFieldDocs topDocs) {
return topNSize > 0 && topDocs.scoreDocs.length == topNSize ?
(FieldDoc) topDocs.scoreDocs[topNSize-1] : null;
}

private int compareValues(Object[] v1, Object[] v2) {
for (int i = 0; i < v1.length; i++) {
int cmp = reverseMuls[i] * comparators[i].compareValues(v1[i], v2[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand Down Expand Up @@ -724,15 +725,6 @@ int getNumBuffered() {
int getNumReducePhases() { return numReducePhases; }
}

private int resolveTrackTotalHits(SearchRequest request) {
if (request.scroll() != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo();
}

/**
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
Expand All @@ -743,7 +735,7 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final int trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = requestToAggReduceContextBuilder.apply(request);
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
Expand All @@ -28,18 +29,28 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;

class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {

private final SearchPhaseController searchPhaseController;
private final SearchProgressListener progressListener;

// informations to track the best bottom top doc globally.
private final int topDocsSize;
private final int trackTotalHitsUpTo;
private volatile BottomSortValuesCollector bottomSortCollector;

SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
Expand All @@ -53,27 +64,64 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
executor, request, listener, shardsIts, timeProvider, clusterState, task,
searchPhaseController.newSearchPhaseResults(task.getProgressListener(), request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.topDocsSize = getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
this.searchPhaseController = searchPhaseController;
this.progressListener = task.getProgressListener();
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
request, getTask(), listener);
}

@Override
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
}

@Override
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
QuerySearchResult queryResult = result.queryResult();
if (queryResult.isNull() == false && queryResult.topDocs().topDocs instanceof TopFieldDocs) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
if (bottomSortCollector == null) {
synchronized (this) {
if (bottomSortCollector == null) {
bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
}
}
}
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
}
super.onShardResult(result, shardIt);
}

@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context, clusterState());
}

private ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
if (bottomSortCollector == null) {
return request;
}

// disable tracking total hits if we already reached the required estimation.
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE
&& bottomSortCollector.getTotalHits() > trackTotalHitsUpTo) {
request.source(request.source().shallowCopy().trackTotalHits(false));
}

// set the current best bottom field doc
if (bottomSortCollector.getBottomSortValues() != null) {
request.setBottomSortValues(bottomSortCollector.getBottomSortValues());
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,19 @@ public boolean isSuggestOnly() {
return source != null && source.isSuggestOnly();
}

public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}

public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder source) {
if (scroll != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return source == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo();
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public DateMathParser getDateMathParser() {
return parser;
}

@Override
public String format(long value) {
return formatter.format(resolution.toInstant(value).atZone(timeZone));
Expand Down
Loading