diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml index 7c960878d8515..d6bcf30bde347 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml @@ -153,3 +153,32 @@ setup: - match: { _shards.failed: 0 } - match: { hits.total: 2 } - length: { aggregations.idx_terms.buckets: 2 } + + # check that empty responses are correctly handled when rewriting to match_no_docs + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 1 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 2 } + - length: { aggregations.idx_terms.buckets: 2 } + + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 2 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 0 } + - length: { aggregations.idx_terms.buckets: 0 } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index ca68bb4008146..d4d313c0afab1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -82,6 +83,7 @@ abstract class AbstractSearchAsyncAction exten private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); + private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); private final SearchTimeProvider timeProvider; @@ -467,6 +469,7 @@ private void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getSearchShardTarget() != null : "search shard target must not be null"; successfulOps.incrementAndGet(); results.consumeResult(result); + hasShardResponse.set(true); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } @@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar String indexName = shardIt.shardId().getIndex().getName(); final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) .toArray(new String[0]); - return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings); + // if we already received a search result we can inform the shard that it + // can return a null response if the request rewrites to match none rather + // than creating an empty response in the search thread pool. + shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get()); + return shardRequest; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 27b5c9cf3b2a8..59a5082ffe922 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -65,6 +66,7 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.stream.Collectors; public final class SearchPhaseController { @@ -427,6 +429,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection res.queryResult().isNull() == false) + .collect(Collectors.toList()); + String errorMsg = "must have at least one non-empty search result, got 0 out of " + total; + assert queryResults.isEmpty() == false : errorMsg; + if (queryResults.isEmpty()) { + throw new IllegalStateException(errorMsg); + } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); @@ -497,6 +508,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection= 2 if there is more than one expected result"); @@ -610,6 +634,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.hasAggs = hasAggs; this.bufferSize = bufferSize; this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo); + this.topNSize = topNSize; this.performFinalReduce = performFinalReduce; } @@ -622,36 +647,38 @@ public void consumeResult(SearchPhaseResult result) { } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - if (index == bufferSize) { + if (querySearchResult.isNull() == false) { + if (index == bufferSize) { + if (hasAggs) { + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), + // we have to merge here in the same way we collect on a shard + topNSize, 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } + numReducePhases++; + index = 1; + if (hasAggs) { + progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), + topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); + } + } + final int i = index++; if (hasAggs) { - ReduceContext reduceContext = controller.reduceContextFunction.apply(false); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); - Arrays.fill(aggsBuffer, null); - aggsBuffer[0] = reducedAggs; + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); } if (hasTopDocs) { - TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), - // we have to merge here in the same way we collect on a shard - querySearchResult.from() + querySearchResult.size(), 0); - Arrays.fill(topDocsBuffer, null); - topDocsBuffer[0] = reducedTopDocs; + final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); + setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs.topDocs; } - numReducePhases++; - index = 1; - if (hasAggs) { - progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), - topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); - } - } - final int i = index++; - if (hasAggs) { - aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); - } - if (hasTopDocs) { - final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null - topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); - setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); - topDocsBuffer[i] = topDocs.topDocs; } processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } @@ -706,9 +733,10 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { + int topNSize = getTopDocsSize(request); // only use this if there are aggs and if there are more shards than we should reduce at once return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, - trackTotalHitsUpTo, request.isFinalReduce()); + trackTotalHitsUpTo, topNSize, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { @@ -731,7 +759,7 @@ ReducedQueryPhase reduce() { static final class TopDocsStats { final int trackTotalHitsUpTo; - private long totalHits; + long totalHits; private TotalHits.Relation totalHitsRelation; long fetchHits; private float maxScore = Float.NEGATIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d3d3558078147..36e9054586a8d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1254,6 +1254,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); + return wrapSearcher(searcher); + } + + /** + * Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}. + */ + public Engine.Searcher wrapSearcher(Engine.Searcher searcher) { assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader"; boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 6c6f8ef318860..fbffe60033d28 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -308,11 +308,31 @@ protected void doClose() { } public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); + rewriteShardRequest(request, ActionListener.wrap( + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context -> { + try { + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeDfsPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); } - private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { - final SearchContext context = createAndPutContext(request); + private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException { + final SearchContext context = createAndPutContext(rewriteContext); context.incRef(); try { context.setTask(task); @@ -343,15 +363,59 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); + assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 + : "empty responses require more than one shard"; + rewriteShardRequest(request, ActionListener.wrap( + context -> { + try { + ShardSearchRequest rewritten = context.request; + if (rewritten.canReturnNullResponseIfMatchNoDocs() + && canRewriteToMatchNone(rewritten.source()) + && rewritten.source().query() instanceof MatchNoneQueryBuilder) { + onMatchNoDocs(context, listener); + } else { + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeQueryPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); + } + + private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener listener) { + // creates a lightweight search context that we use to inform context listeners + // before closing + SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout); + try (SearchContext dummy = searchContext) { + onNewContext(searchContext); + onFreeContext(searchContext); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(QuerySearchResult.nullInstance()); } private void runAsync(long id, Supplier executable, ActionListener listener) { getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { - final SearchContext context = createAndPutContext(request); + private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception { + final SearchContext context = createAndPutContext(rewriteContext); + final ShardSearchRequest request = rewriteContext.request; context.incRef(); try { context.setTask(task); @@ -542,15 +606,8 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear } } - final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException { - if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) { - throw new ElasticsearchException( - "Trying to create too many scroll contexts. Must be less than or equal to: [" + - maxOpenScrollContext + "]. " + "This limit can be set by changing the [" - + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); - } - - SearchContext context = createContext(request); + final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) { + SearchContext context = createContext(rewriteContext); onNewContext(context); boolean success = false; try { @@ -584,9 +641,16 @@ private void onNewContext(SearchContext context) { } } - final SearchContext createContext(ShardSearchRequest request) throws IOException { - final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); + final SearchContext createContext(SearchRewriteContext rewriteContext) { + final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout); try { + if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) { + throw new ElasticsearchException( + "Trying to create too many scroll contexts. Must be less than or equal to: [" + + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); + } + final ShardSearchRequest request = rewriteContext.request; if (request.scroll() != null) { context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); @@ -622,42 +686,33 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException } public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { - return createSearchContext(request, timeout, true, "search"); + IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + // make sure that we wrap the searcher when executing the query + return createSearchContext(rewriteContext.wrapSearcher(), timeout); } - private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, - boolean assertAsyncActions, String source) - throws IOException { + private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) { + final ShardSearchRequest request = rewriteContext.request; + final Engine.Searcher searcher = rewriteContext.searcher; IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); - Engine.Searcher searcher = indexShard.acquireSearcher(source); - boolean success = false; - DefaultSearchContext searchContext = null; try { - searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, + DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase, clusterService.state().nodes().getMinNodeVersion()); - // we clone the query shard context here just for rewriting otherwise we - // might end up with incorrect state since we are using now() or script services - // during rewrite and normalized / evaluate templates etc. - QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext()); - Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions); - assert searchContext.getQueryShardContext().isCacheable(); success = true; + return searchContext; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(searchContext); - if (searchContext == null) { - // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise - // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). - IOUtils.closeWhileHandlingException(searcher); - } + // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise + // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). + IOUtils.closeWhileHandlingException(rewriteContext.searcher); } } - return searchContext; } private void freeAllContextForIndex(Index index) { @@ -1063,24 +1118,50 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { return aggregations == null || aggregations.mustVisitAllDocs() == false; } - /* * Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously - * The action listener is guaranteed to be executed on the search thread-pool + * and then rewrites with a searcher when the shard is active. + * The provided action listener is executed on the same thread or in a listener threadpool. */ - private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); - Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register - shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))), - listener::onFailure); + shard.awaitShardSearchActive(b -> { + try { + // we can now acquire a searcher and rewrite the request with it + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + listener.onResponse(rewriteContext); + } catch (Exception e) { + listener.onFailure(e); + } + }), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); } + SearchRewriteContext acquireSearcherAndRewrite(ShardSearchRequest request, IndexShard shard) throws IOException { + // acquire the searcher for rewrite with no wrapping in order to avoid costly + // operations. We'll wrap the searcher at a later stage (when executing the query). + Engine.Searcher searcher = shard.acquireSearcherNoWrap("search"); + boolean success = false; + try { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher, + request::nowInMillis, request.getClusterAlias()); + Rewriteable.rewrite(request.getRewriteable(), context, true); + SearchRewriteContext rewrite = new SearchRewriteContext(request, shard, searcher, getExecutor(shard)); + success = true; + return rewrite; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(searcher); + } + } + } + /** * Returns a new {@link QueryRewriteContext} with the given {@code now} provider */ @@ -1097,6 +1178,37 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce); } + static class SearchRewriteContext { + private final ShardSearchRequest request; + private final IndexShard shard; + private Engine.Searcher searcher; + private final Executor executor; + + private boolean isWrapped; + + private SearchRewriteContext(ShardSearchRequest request, + IndexShard shard, + Engine.Searcher searcher, + Executor executor) { + this.request = request; + this.shard = shard; + this.searcher = searcher; + this.executor = executor; + } + + SearchRewriteContext wrapSearcher() { + assert isWrapped == false : "searcher already wrapped"; + isWrapped = true; + searcher = shard.wrapSearcher(searcher); + return this; + } + + void execute(Runnable runnable) { + assert isWrapped : "searcher is not wrapped"; + executor.execute(runnable); + } + } + public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; private final MinAndMax minAndMax; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index c827a750e71f6..fce338fa7efec 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -47,6 +47,7 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; @@ -75,7 +76,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final String preference; private final OriginalIndices originalIndices; - //these are the only two mutable fields, as they are subject to rewriting + private boolean canReturnNullResponseIfMatchNoDocs; + + //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; private SearchSourceBuilder source; @@ -175,6 +178,11 @@ public ShardSearchRequest(StreamInput in) throws IOException { indexRoutings = Strings.EMPTY_ARRAY; preference = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + canReturnNullResponseIfMatchNoDocs = in.readBoolean(); + } else { + canReturnNullResponseIfMatchNoDocs = false; + } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -212,6 +220,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeOptionalString(preference); } } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + out.writeBoolean(canReturnNullResponseIfMatchNoDocs); + } } @Override @@ -290,6 +301,19 @@ public String preference() { return preference; } + /** + * Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}. + * Defaults to false since the coordinator node needs at least one shard response to build the global + * response. + */ + public boolean canReturnNullResponseIfMatchNoDocs() { + return canReturnNullResponseIfMatchNoDocs; + } + + public void canReturnNullResponseIfMatchNoDocs(boolean value) { + this.canReturnNullResponseIfMatchNoDocs = value; + } + /** * Returns the cache key for this shard search request, based on its content */ diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 6ab8b2a07164c..288e5b41733a5 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -63,18 +63,56 @@ public final class QuerySearchResult extends SearchPhaseResult { private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; + private final boolean isNull; + public QuerySearchResult() { + this(false); } public QuerySearchResult(StreamInput in) throws IOException { super(in); - long id = in.readLong(); - readFromWithId(id, in); + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + isNull = in.readBoolean(); + } else { + isNull = false; + } + if (isNull == false) { + long id = in.readLong(); + readFromWithId(id, in); + } } public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); + isNull = false; + } + + private QuerySearchResult(boolean isNull) { + this.isNull = isNull; + } + + private static final QuerySearchResult nullInstance = new QuerySearchResult(true); + + /** + * Returns an instance that contains no response. + */ + public static QuerySearchResult nullInstance() { + return nullInstance; + } + + /** + * Returns true if the result doesn't contain any useful information. + * It is used by the search action to avoid creating an empty response on + * shard request that rewrites to match_no_docs. + * + * TODO: Currently we need the concrete aggregators to build empty responses. This means that we cannot + * build an empty response in the coordinating node so we rely on this hack to ensure that at least one shard + * returns a valid empty response. We should move the ability to create empty responses to aggregation builders + * in order to allow building empty responses directly from the coordinating node. + */ + public boolean isNull() { + return isNull; } @Override @@ -173,6 +211,10 @@ public void aggregations(InternalAggregations aggregations) { hasAggs = aggregations != null; } + public InternalAggregations aggregations() { + return aggregations; + } + /** * Returns and nulls out the profiled results for this search, or potentially null if result was empty. * This allows to free up memory once the profiled result is consumed. @@ -305,8 +347,13 @@ public void readFromWithId(long id, StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(requestId); - writeToNoId(out); + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + out.writeBoolean(isNull); + } + if (isNull == false) { + out.writeLong(requestId); + writeToNoId(out); + } } public void writeToNoId(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 05b7633571ba1..51d6b89f12220 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -338,16 +338,37 @@ private static SearchRequest randomSearchRequest() { } public void testConsumer() { + consumerTestCase(0); + } + + public void testConsumerWithEmptyResponse() { + consumerTestCase(randomIntBetween(1, 5)); + } + + private void consumerTestCase(int numEmptyResponses) { + int numShards = 3 + numEmptyResponses; int bufferSize = randomIntBetween(2, 3); SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3); - assertEquals(0, reductions.size()); + ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(NOOP, request, 3+numEmptyResponses); + if (numEmptyResponses == 0) { + assertEquals(0, reductions.size()); + } + if (numEmptyResponses > 0) { + QuerySearchResult empty = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + empty.setShardIndex(2+numEmptyResponses); + empty.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(empty); + numEmptyResponses --; + } + QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -356,7 +377,7 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -365,20 +386,38 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(1); consumer.consumeResult(result); + + while (numEmptyResponses > 0) { + result = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + result.setShardIndex(shardId); + result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(result); + numEmptyResponses--; + + } + final int numTotalReducePhases; - if (bufferSize == 2) { + if (numShards > bufferSize) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); - assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases()); - assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); - assertEquals(1, reductions.size()); - assertEquals(false, reductions.get(0)); - numTotalReducePhases = 2; + if (bufferSize == 2) { + assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(1, reductions.size()); + assertEquals(false, reductions.get(0)); + numTotalReducePhases = 2; + } else { + assertEquals(0, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(3, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(0, reductions.size()); + numTotalReducePhases = 1; + } } else { assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); assertEquals(0, reductions.size()); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java index 0c5eb73385b32..2f11960480ddc 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; @@ -123,20 +124,26 @@ public void testQueryRewrite() throws Exception { assertCacheState(client, "index", 0, 0); final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE).get(); + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) + // to ensure that query is executed even if it rewrites to match_no_docs + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 0, 5); final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .addAggregation(new GlobalAggregationBuilder("global")) .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 3, 7); final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")) + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE) .get(); ElasticsearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(7L)); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index c410066dbe693..ab6b1af4a2364 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -77,6 +78,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.Before; @@ -168,10 +170,12 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override public void onNewContext(SearchContext context) { - if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); - } else { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + if (context.query() != null) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } } } @@ -357,15 +361,11 @@ public void testTimeout() throws IOException { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final SearchContext contextWithDefaultTimeout = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext contextWithDefaultTimeout = service.createContext(rewriteContext); try { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -376,15 +376,11 @@ public void testTimeout() throws IOException { final long seconds = randomIntBetween(6, 10); searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds))); - final SearchContext context = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext context = service.createContext(rewriteContext); try { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); @@ -412,19 +408,25 @@ public void testMaxDocvalueFieldsSearch() throws IOException { for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) { searchSourceBuilder.docValueField("field" + i); } - try (SearchContext context = service.createContext( - new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)) - ) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); searchSourceBuilder.docValueField("one_field_too_much"); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " - + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", - ex.getMessage()); + "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", ex.getMessage()); } } @@ -447,20 +449,28 @@ public void testMaxScriptFieldsSearch() throws IOException { searchSourceBuilder.scriptField("field" + i, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { searchSourceBuilder.scriptField("anotherScriptField", - new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" - + (maxScriptFields + 1) - + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", - ex.getMessage()); + "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + + (maxScriptFields + 1) + + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", + ex.getMessage()); } } @@ -477,17 +487,19 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { searchSourceBuilder.scriptField("field" + 0, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, - searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertEquals(0, context.scriptFields().fields().size()); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertEquals(0, context.scriptFields().fields().size()); } } /** * test that creating more than the allowed number of scroll contexts throws an exception */ - public void testMaxOpenScrollContexts() throws RuntimeException { + public void testMaxOpenScrollContexts() throws RuntimeException, IOException { createIndex("index"); client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -513,8 +525,10 @@ public void testMaxOpenScrollContexts() throws RuntimeException { client().prepareSearch("index").setSize(1).setScroll("1m").get(); } + SearchService.SearchRewriteContext rewriteContext = + service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()))); + () -> service.createAndPutContext(rewriteContext)); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -592,7 +606,7 @@ public Scroll scroll() { } } - public void testCanMatch() throws IOException { + public void testCanMatch() throws IOException, InterruptedException { createIndex("index"); final SearchService service = getInstanceFromNode(SearchService.class); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -625,11 +639,32 @@ public void testCanMatch() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); assertEquals(numWrapReader, numWrapInvocations.get()); - // make sure that the wrapper is called when the context is actually created - service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1f, -1, null, null)).close(); - assertEquals(numWrapReader+1, numWrapInvocations.get()); + ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + CountDownLatch latch = new CountDownLatch(1); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(request, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + try { + // make sure that the wrapper is called when the query is actually executed + assertEquals(numWrapReader+1, numWrapInvocations.get()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); } public void testCanRewriteToMatchNone() { @@ -744,18 +779,123 @@ public void testCreateSearchContextFailure() throws IOException { final IndexService indexService = createIndex(index); final SearchService service = getInstanceFromNode(SearchService.class); final ShardId shardId = new ShardId(indexService.index(), 0); + IndexShard indexShard = indexService.getShard(0); - NullPointerException e = expectThrows(NullPointerException.class, - () -> service.createContext( - new ShardSearchRequest(shardId, null, 0, null) { - @Override - public SearchType searchType() { - // induce an artificial NPE - throw new NullPointerException("expected"); - } + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(shardId, null, 0, AliasFilter.EMPTY) { + @Override + public SearchType searchType() { + // induce an artificial NPE + throw new NullPointerException("expected"); } - )); + }, indexShard); + NullPointerException e = expectThrows(NullPointerException.class, + () -> service.createContext(rewriteContext)); assertEquals("expected", e.getMessage()); assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); } + + public void testMatchNoDocsEmptyResponse() throws InterruptedException { + createIndex("index"); + Thread currentThread = Thread.currentThread(); + SearchService service = getInstanceFromNode(SearchService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + IndexShard indexShard = indexService.getShard(0); + SearchRequest searchRequest = new SearchRequest() + .allowPartialSearchResults(false) + .source(new SearchSourceBuilder() + .aggregation(AggregationBuilders.count("count").field("value"))); + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 5, AliasFilter.EMPTY, 1.0f, 0, null, null); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchAllQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchNoneQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.canReturnNullResponseIfMatchNoDocs(true); + service.executeQueryPhase(shardRequest, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + // make sure we don't use the search threadpool + assertSame(Thread.currentThread(), currentThread); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertTrue(result.queryResult().isNull()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index f0503ff12103a..845276f8eb4b0 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -77,6 +77,8 @@ public void testSerialization() throws Exception { assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias()); assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults()); + assertEquals(deserializedRequest.canReturnNullResponseIfMatchNoDocs(), + shardSearchTransportRequest.canReturnNullResponseIfMatchNoDocs()); } public void testAllowPartialResultsSerializationPre7_0_0() throws IOException { @@ -102,9 +104,11 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } final String[] routings = generateRandomStringArray(5, 10, false, true); - return new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, + ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); + req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); + return req; } public void testFilteringAliases() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index f74cafc22ffc9..175c95813cb05 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -124,10 +124,17 @@ public void testReadFromPre_7_1_0() throws IOException { QuerySearchResult querySearchResult = new QuerySearchResult(in); assertEquals(100, querySearchResult.getRequestId()); assertTrue(querySearchResult.hasAggs()); - InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs(); + InternalAggregations aggs = (InternalAggregations) querySearchResult.consumeAggs(); assertEquals(1, aggs.asList().size()); //top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately assertEquals(1, aggs.getTopLevelPipelineAggregators().size()); } } + + public void testNullResponse() throws Exception { + QuerySearchResult querySearchResult = QuerySearchResult.nullInstance(); + QuerySearchResult deserialized = + copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, Version.CURRENT); + assertEquals(querySearchResult.isNull(), deserialized.isNull()); + } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 396f0b413e366..f549bf9753865 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.index.engine; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -44,9 +43,9 @@ import org.hamcrest.Matchers; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -119,33 +118,36 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx XPackClient xPackClient = new XPackClient(client()); assertAcked(xPackClient.freeze(new FreezeRequest("index"))); int numRequests = randomIntBetween(20, 50); - CountDownLatch latch = new CountDownLatch(numRequests); int numRefreshes = 0; for (int i = 0; i < numRequests; i++) { numRefreshes++; - switch (randomIntBetween(0, 3)) { + // make sure that we don't share the frozen reader in concurrent requests since we acquire the + // searcher and rewrite the request outside of the search-throttle thread pool + switch (randomFrom(Arrays.asList(0, 1, 2))) { case 0: - client().prepareGet("index", "_doc", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareGet("index", "_doc", "" + randomIntBetween(0, 9)) + .get(); break; case 1: client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setSearchType(SearchType.QUERY_THEN_FETCH) - .execute(ActionListener.wrap(latch::countDown)); + .get(); // in total 4 refreshes 1x query & 1x fetch per shard (we have 2) numRefreshes += 3; break; case 2: - client().prepareTermVectors("index", "_doc", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareTermVectors("index", "_doc", "" + randomIntBetween(0, 9)) + .get(); break; case 3: client().prepareExplain("index", "_doc", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder()) - .execute(ActionListener.wrap(latch::countDown)); + .get(); break; - default: - assert false; + + default: + assert false; } } - latch.await(); IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(numRefreshes, index.getTotal().refresh.getTotal()); }