diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index b59f74198c3e8..186c8e8ee3837 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -65,7 +65,6 @@ GET /cluster_one:twitter/_search { "took": 150, "timed_out": false, - "num_reduce_phases": 2, "_shards": { "total": 1, "successful": 1, diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 4499a60bfe24a..fa4ca0588940c 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -36,6 +36,7 @@ terms: field: f1.keyword + - match: { num_reduce_phases: 3 } - match: {_clusters.total: 2} - match: {_clusters.successful: 2} - match: {_clusters.skipped: 0} @@ -63,6 +64,7 @@ terms: field: f1.keyword + - match: { num_reduce_phases: 3 } - match: {_clusters.total: 2} - match: {_clusters.successful: 2} - match: {_clusters.skipped: 0} @@ -83,6 +85,7 @@ terms: field: f1.keyword + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} @@ -103,6 +106,7 @@ terms: field: f1.keyword + - is_false: num_reduce_phases - is_false: _clusters - match: { _shards.total: 2 } - match: { hits.total: 5} @@ -133,6 +137,7 @@ rest_total_hits_as_int: true index: test_remote_cluster:test_index + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} @@ -162,6 +167,7 @@ rest_total_hits_as_int: true index: "*:test_index" + - match: { num_reduce_phases: 3 } - match: {_clusters.total: 2} - match: {_clusters.successful: 2} - match: {_clusters.skipped: 0} @@ -176,6 +182,7 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} @@ -192,6 +199,7 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1 + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} @@ -208,6 +216,7 @@ rest_total_hits_as_int: true index: "my_remote_cluster:single_doc_index" + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml index 6a7fe3c5356c0..ea404702db529 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml @@ -12,6 +12,7 @@ query: match_all: {} + - is_false: num_reduce_phases - match: {_clusters.total: 1} - match: {_clusters.successful: 1} - match: {_clusters.skipped: 0} @@ -28,6 +29,7 @@ rest_total_hits_as_int: true body: { "scroll_id": "$scroll_id", "scroll": "1m"} + - is_false: num_reduce_phases - is_false: _clusters - match: {hits.total: 6 } - length: {hits.hits: 2 } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 67f33398bba68..e5c5b17414b96 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -714,20 +714,18 @@ InitialSearchPhase.ArraySearchPhaseResults newSearchPhaseResu final boolean hasAggs = source != null && source.aggregations() != null; final boolean hasTopDocs = source == null || source.size() != 0; final int trackTotalHitsUpTo = resolveTrackTotalHits(request); - final boolean finalReduce = request.getLocalClusterAlias() == null; - if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, - trackTotalHitsUpTo, finalReduce); + trackTotalHitsUpTo, request.isFinalReduce()); } } return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, finalReduce); + return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce()); } }; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 55122b6806fd2..602a7123d0014 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private final String localClusterAlias; private final long absoluteStartMillis; + private final boolean finalReduce; private SearchType searchType = SearchType.DEFAULT; @@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; + this.finalReduce = true; } /** * Constructs a new search request from the provided search request */ public SearchRequest(SearchRequest searchRequest) { - this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis); + this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, + searchRequest.absoluteStartMillis, searchRequest.finalReduce); } /** @@ -132,25 +135,30 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { } /** - * Creates a new search request by providing the search request to copy all fields from, the indices to search against, - * the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time. - * Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction - * on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the - * alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters - * to ensure that the same value is used. + * Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of + * the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction + * should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request + * performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters. + * + * @param originalSearchRequest the original search request + * @param indices the indices to search against + * @param localClusterAlias the alias to prefix index names with in the returned search results + * @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used + * @param finalReduce whether the reduction should be final or not */ static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices, - String localClusterAlias, long absoluteStartMillis) { + String localClusterAlias, long absoluteStartMillis, boolean finalReduce) { Objects.requireNonNull(originalSearchRequest, "search request must not be null"); validateIndices(indices); Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); if (absoluteStartMillis < 0) { throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]"); } - return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis); + return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis, finalReduce); } - private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) { + private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis, + boolean finalReduce) { this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; this.batchedReduceSize = searchRequest.batchedReduceSize; this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips; @@ -167,6 +175,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca this.types = searchRequest.types; this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; + this.finalReduce = finalReduce; } /** @@ -203,6 +212,12 @@ public SearchRequest(StreamInput in) throws IOException { localClusterAlias = null; absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; } + //TODO move to the 6_7_0 branch once backported to 6.x + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + finalReduce = in.readBoolean(); + } else { + finalReduce = true; + } if (in.getVersion().onOrAfter(Version.V_7_0_0)) { ccsMinimizeRoundtrips = in.readBoolean(); } @@ -232,6 +247,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(absoluteStartMillis); } } + //TODO move to the 6_7_0 branch once backported to 6.x + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(finalReduce); + } if (out.getVersion().onOrAfter(Version.V_7_0_0)) { out.writeBoolean(ccsMinimizeRoundtrips); } @@ -277,11 +296,18 @@ String getLocalClusterAlias() { return localClusterAlias; } + /** + * Returns whether the reduction phase that will be performed needs to be final or not. + */ + boolean isFinalReduce() { + return finalReduce; + } + /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided - * current time, otherwise it will return {@link System#currentTimeMillis()}. + * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean)}, this method returns + * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. * */ long getOrCreateAbsoluteStartMillis() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index dd0d4de07d6f4..6ae5e1a553eb6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; @@ -35,8 +36,10 @@ import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; @@ -47,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -497,4 +501,12 @@ public String toString() { return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}'; } } + + static SearchResponse empty(Supplier tookInMillisSupplier, Clusters clusters) { + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, + InternalAggregations.EMPTY, null, null, false, null, 0); + return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(), + ShardSearchFailure.EMPTY_ARRAY, clusters); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 567040246c50f..3b28ca19477ab 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -115,11 +115,7 @@ SearchResponse getMergedResponse(Clusters clusters) { //if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true, //we end up calling merge without anything to merge, we just return an empty search response if (searchResponses.size() == 0) { - SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN); - InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, - InternalAggregations.EMPTY, null, null, false, null, 0); - return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(), - ShardSearchFailure.EMPTY_ARRAY, clusters); + return SearchResponse.empty(searchTimeProvider::buildTookInMillis, clusters); } int totalShards = 0; int skippedShards = 0; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 48ae3f1249522..519f2c88e0e58 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -48,9 +48,13 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.profile.ProfileShardResult; +import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -253,30 +257,66 @@ static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIn SearchTimeProvider timeProvider, Function reduceContext, RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, BiConsumer> localSearchConsumer) { - SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext); - AtomicInteger skippedClusters = new AtomicInteger(0); - final AtomicReference exceptions = new AtomicReference<>(); - int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1); - final CountDown countDown = new CountDown(totalClusters); - for (Map.Entry entry : remoteIndices.entrySet()) { + + if (localIndices == null && remoteIndices.size() == 1) { + //if we are searching against a single remote cluster, we simply forward the original search request to such cluster + //and we directly perform final reduction in the remote cluster + Map.Entry entry = remoteIndices.entrySet().iterator().next(); String clusterAlias = entry.getKey(); boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); OriginalIndices indices = entry.getValue(); SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(), - clusterAlias, timeProvider.getAbsoluteStartMillis()); - ActionListener ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown, - skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + clusterAlias, timeProvider.getAbsoluteStartMillis(), true); Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); - remoteClusterClient.search(ccsSearchRequest, ccsListener); - } - if (localIndices != null) { - ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, - false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); - //here we provide the empty string a cluster alias, which means no prefix in index name, - //but the coord node will perform non final reduce as it's not null. - SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(), - RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis()); - localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); + remoteClusterClient.search(ccsSearchRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Map profileResults = searchResponse.getProfileResults(); + SearchProfileShardResults profile = profileResults == null || profileResults.isEmpty() + ? null : new SearchProfileShardResults(profileResults); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchResponse.getHits(), + (InternalAggregations) searchResponse.getAggregations(), searchResponse.getSuggest(), profile, + searchResponse.isTimedOut(), searchResponse.isTerminatedEarly(), searchResponse.getNumReducePhases()); + listener.onResponse(new SearchResponse(internalSearchResponse, searchResponse.getScrollId(), + searchResponse.getTotalShards(), searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), + timeProvider.buildTookInMillis(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0))); + } + + @Override + public void onFailure(Exception e) { + if (skipUnavailable) { + listener.onResponse(SearchResponse.empty(timeProvider::buildTookInMillis, new SearchResponse.Clusters(1, 0, 1))); + } else { + listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e)); + } + } + }); + } else { + SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext); + AtomicInteger skippedClusters = new AtomicInteger(0); + final AtomicReference exceptions = new AtomicReference<>(); + int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1); + final CountDown countDown = new CountDown(totalClusters); + for (Map.Entry entry : remoteIndices.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + OriginalIndices indices = entry.getValue(); + SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(), + clusterAlias, timeProvider.getAbsoluteStartMillis(), false); + ActionListener ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown, + skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + remoteClusterClient.search(ccsSearchRequest, ccsListener); + } + if (localIndices != null) { + ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + //here we provide the empty string a cluster alias, which means no prefix in index name, + //but the coord node will perform non final reduce as it's not null. + SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(), + RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); + localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); + } } } @@ -297,9 +337,6 @@ static SearchResponseMerger createSearchResponseMerger(SearchSourceBuilder sourc //here we modify the original source so we can re-use it by setting it to each outgoing search request source.from(0); source.size(from + size); - //TODO when searching only against a remote cluster, we could ask directly for the final number of results and let - //the remote cluster do a final reduction, yet that is not possible as we are providing a localClusterAlias which - //will automatically make the reduction non final } return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction); } @@ -604,7 +641,7 @@ public final void onFailure(Exception e) { } else { Exception exception = e; if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { - exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + exception = wrapRemoteClusterFailure(clusterAlias, e); } if (exceptions.compareAndSet(null, exception) == false) { exceptions.accumulateAndGet(exception, (previous, current) -> { @@ -636,4 +673,8 @@ private void maybeFinish() { abstract FinalResponse createFinalResponse(); } + + private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) { + return new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + } } diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index f5c99fc513759..7085e5ba5868c 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -641,23 +641,23 @@ public SearchSourceBuilder collapse(CollapseBuilder collapse) { * Add an aggregation to perform as part of the search. */ public SearchSourceBuilder aggregation(AggregationBuilder aggregation) { - if (aggregations == null) { + if (aggregations == null) { aggregations = AggregatorFactories.builder(); - } + } aggregations.addAggregator(aggregation); - return this; + return this; } /** * Add an aggregation to perform as part of the search. */ public SearchSourceBuilder aggregation(PipelineAggregationBuilder aggregation) { - if (aggregations == null) { + if (aggregations == null) { aggregations = AggregatorFactories.builder(); - } - aggregations.addPipelineAggregator(aggregation); - return this; } + aggregations.addPipelineAggregator(aggregation); + return this; + } /** * Gets the bytes representing the aggregation builders for this request. diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index e9cde3f7aadea..9107b75db1798 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -30,6 +30,7 @@ import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.text.Text; @@ -313,9 +314,14 @@ private static AtomicArray generateFetchResults(int nShards, return fetchResults; } + private static SearchRequest randomSearchRequest() { + return randomBoolean() ? new SearchRequest() : SearchRequest.withLocalReduction(new SearchRequest(), + Strings.EMPTY_ARRAY, "remote", 0, randomBoolean()); + } + public void testConsumer() { int bufferSize = randomIntBetween(2, 3); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); @@ -377,7 +383,7 @@ public void testConsumerConcurrently() throws InterruptedException { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -424,7 +430,7 @@ public void testConsumerConcurrently() throws InterruptedException { public void testConsumerOnlyAggs() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -460,7 +466,7 @@ public void testConsumerOnlyAggs() { public void testConsumerOnlyHits() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); if (randomBoolean()) { request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); } @@ -493,8 +499,7 @@ public void testConsumerOnlyHits() { private void assertFinalReduction(SearchRequest searchRequest) { assertThat(reductions.size(), greaterThanOrEqualTo(1)); - //the last reduction step was the final one only if no cluster alias was provided with the search request - assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1)); + assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1)); } public void testNewSearchPhaseResults() { @@ -568,7 +573,7 @@ public void testReduceTopNWithFromOffset() { public void testConsumerSortByField() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -604,7 +609,7 @@ public void testConsumerSortByField() { public void testConsumerFieldCollapsing() { int expectedNumResults = randomIntBetween(30, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 1d2d59c60e2ae..c139b75f45c42 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -54,17 +54,20 @@ protected SearchRequest createSearchRequest() throws IOException { } //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically. return SearchRequest.withLocalReduction(request, request.indices(), - randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); + randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean()); } public void testWithLocalReduction() { - expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0)); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0, randomBoolean())); SearchRequest request = new SearchRequest(); - expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0)); - expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, new String[]{null}, "", 0)); - expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, null, 0)); - expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", -1)); - SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0, randomBoolean())); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, + new String[]{null}, "", 0, randomBoolean())); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, + Strings.EMPTY_ARRAY, null, 0, randomBoolean())); + expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, + Strings.EMPTY_ARRAY, "", -1, randomBoolean())); + SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0, randomBoolean()); assertNull(searchRequest.validate()); } @@ -92,6 +95,12 @@ public void testRandomVersionSerialization() throws IOException { assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias()); assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis()); } + //TODO move to the 6_7_0 branch once backported to 6.x + if (version.before(Version.V_7_0_0)) { + assertTrue(deserializedRequest.isFinalReduce()); + } else { + assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce()); + } } public void testReadFromPre6_7_0() throws IOException { @@ -103,6 +112,7 @@ public void testReadFromPre6_7_0() throws IOException { assertNull(searchRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(searchRequest); assertTrue(searchRequest.isCcsMinimizeRoundtrips()); + assertTrue(searchRequest.isFinalReduce()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index 8fd75c5fd673d..ed14d11946f75 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -27,13 +27,17 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase { public void testLocalClusterAlias() { - long nowInMillis = System.currentTimeMillis(); + long nowInMillis = randomLongBetween(0, Long.MAX_VALUE); IndexRequest indexRequest = new IndexRequest("test"); indexRequest.id("1"); indexRequest.source("field", "value"); @@ -42,7 +46,8 @@ public void testLocalClusterAlias() { assertEquals(RestStatus.CREATED, indexResponse.status()); { - SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "local", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, + "local", nowInMillis, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -53,7 +58,8 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } { - SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, + "", nowInMillis, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -94,19 +100,22 @@ public void testAbsoluteStartMillis() { assertEquals(0, searchResponse.getTotalShards()); } { - SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), + Strings.EMPTY_ARRAY, "", 0, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(2, searchResponse.getHits().getTotalHits().value); } { - SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), + Strings.EMPTY_ARRAY, "", 0, randomBoolean()); searchRequest.indices(""); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } { - SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), + Strings.EMPTY_ARRAY, "", 0, randomBoolean()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); rangeQuery.gte("1970-01-01"); @@ -118,4 +127,50 @@ public void testAbsoluteStartMillis() { assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } } + + public void testFinalReduce() { + long nowInMillis = randomLongBetween(0, Long.MAX_VALUE); + { + IndexRequest indexRequest = new IndexRequest("test"); + indexRequest.id("1"); + indexRequest.source("price", 10); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + { + IndexRequest indexRequest = new IndexRequest("test"); + indexRequest.id("2"); + indexRequest.source("price", 100); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + client().admin().indices().prepareRefresh("test").get(); + + SearchRequest originalRequest = new SearchRequest(); + SearchSourceBuilder source = new SearchSourceBuilder(); + source.size(0); + originalRequest.source(source); + TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC); + terms.field("price"); + terms.size(1); + source.aggregation(terms); + + { + SearchRequest searchRequest = randomBoolean() ? originalRequest : SearchRequest.withLocalReduction(originalRequest, + Strings.EMPTY_ARRAY, "remote", nowInMillis, true); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits().value); + Aggregations aggregations = searchResponse.getAggregations(); + LongTerms longTerms = aggregations.get("terms"); + assertEquals(1, longTerms.getBuckets().size()); + } + { + SearchRequest searchRequest = SearchRequest.withLocalReduction(originalRequest, + Strings.EMPTY_ARRAY, "remote", nowInMillis, false); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits().value); + Aggregations aggregations = searchResponse.getAggregations(); + LongTerms longTerms = aggregations.get("terms"); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 8a5859e200eac..9a9524d0ff57e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -402,7 +402,7 @@ public void testCCSRemoteReduceMergeFails() throws Exception { } public void testCCSRemoteReduce() throws Exception { - int numClusters = randomIntBetween(2, 10); + int numClusters = randomIntBetween(1, 10); DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; Map remoteIndicesByCluster = new HashMap<>(); Settings.Builder builder = Settings.builder(); @@ -440,7 +440,7 @@ public void testCCSRemoteReduce() throws Exception { assertEquals(0, searchResponse.getClusters().getSkipped()); assertEquals(totalClusters, searchResponse.getClusters().getTotal()); assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); - assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases()); } { SearchRequest searchRequest = new SearchRequest(); @@ -510,7 +510,6 @@ public void onNodeDisconnected(DiscoveryNode node) { awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(failure.get()); assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); } @@ -583,7 +582,7 @@ public void onNodeDisconnected(DiscoveryNode node) { assertEquals(0, searchResponse.getClusters().getSkipped()); assertEquals(totalClusters, searchResponse.getClusters().getTotal()); assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); - assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases()); } assertEquals(0, service.getConnectionManager().size()); } finally {