diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 0d7bf0a2edca4..8055b22d22e3b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -712,19 +712,19 @@ InitialSearchPhase.ArraySearchPhaseResults newSearchPhaseResu final boolean hasAggs = source != null && source.aggregations() != null; final boolean hasTopDocs = source == null || source.size() != 0; final boolean trackTotalHits = source == null || source.trackTotalHits(); - final boolean finalReduce = request.getLocalClusterAlias() == null; if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, + request.isFinalReduce()); } } return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce); + return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, request.isFinalReduce()); } }; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index b28a07af1d9f2..29cea474d80df 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -69,6 +69,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private String localClusterAlias; private long absoluteStartMillis; + private boolean finalReduce; private SearchType searchType = SearchType.DEFAULT; @@ -102,6 +103,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; + this.finalReduce = true; } /** @@ -123,6 +125,7 @@ public SearchRequest(SearchRequest searchRequest) { this.types = searchRequest.types; this.localClusterAlias = searchRequest.localClusterAlias; this.absoluteStartMillis = searchRequest.absoluteStartMillis; + this.finalReduce = searchRequest.finalReduce; } /** @@ -147,16 +150,18 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { /** * Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in - * milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search - * request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in - * the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used. + * milliseconds from the epoch time and whether the reduction should be final or not. Used when a {@link SearchRequest} is created + * and executed as part of a cross-cluster search request performing reduction on each remote cluster. The coordinating CCS node + * provides the alias to prefix index names with in the returned search results, the current time to be used on the remote clusters + * to ensure that the same value is used, and determines whether the reduction phase should be final or not. */ - SearchRequest(String localClusterAlias, long absoluteStartMillis) { + SearchRequest(String localClusterAlias, long absoluteStartMillis, boolean finalReduce) { this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); if (absoluteStartMillis < 0) { throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]"); } this.absoluteStartMillis = absoluteStartMillis; + this.finalReduce = finalReduce; } @Override @@ -195,10 +200,17 @@ String getLocalClusterAlias() { return localClusterAlias; } + /** + * Returns whether the reduction phase that will be performed needs to be final or not. + */ + boolean isFinalReduce() { + return finalReduce; + } + /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise + * request. When created through {@link #SearchRequest(String, long, boolean)}, this method returns the provided current time, otherwise * it will return {@link System#currentTimeMillis()}. * */ @@ -518,12 +530,15 @@ public void readFrom(StreamInput in) throws IOException { localClusterAlias = in.readOptionalString(); if (localClusterAlias != null) { absoluteStartMillis = in.readVLong(); + finalReduce = in.readBoolean(); } else { absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; + finalReduce = true; } } else { localClusterAlias = null; absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; + finalReduce = true; } } @@ -554,6 +569,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(localClusterAlias); if (localClusterAlias != null) { out.writeVLong(absoluteStartMillis); + out.writeBoolean(finalReduce); } } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 3b9f28b3e3449..ebe413f8c5e18 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -306,9 +306,13 @@ private static AtomicArray generateFetchResults(int nShards, return fetchResults; } + private static SearchRequest randomSearchRequest() { + return randomBoolean() ? new SearchRequest() : new SearchRequest("remote", 0, randomBoolean()); + } + public void testConsumer() { int bufferSize = randomIntBetween(2, 3); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); @@ -366,7 +370,7 @@ public void testConsumerConcurrently() throws InterruptedException { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -410,7 +414,7 @@ public void testConsumerConcurrently() throws InterruptedException { public void testConsumerOnlyAggs() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -444,7 +448,7 @@ public void testConsumerOnlyAggs() { public void testConsumerOnlyHits() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); if (randomBoolean()) { request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); } @@ -475,8 +479,7 @@ public void testConsumerOnlyHits() { private void assertFinalReduction(SearchRequest searchRequest) { assertThat(reductions.size(), greaterThanOrEqualTo(1)); - //the last reduction step was the final one only if no cluster alias was provided with the search request - assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1)); + assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1)); } public void testNewSearchPhaseResults() { @@ -548,7 +551,7 @@ public void testReduceTopNWithFromOffset() { public void testConsumerSortByField() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -583,7 +586,7 @@ public void testConsumerSortByField() { public void testConsumerFieldCollapsing() { int expectedNumResults = randomIntBetween(30, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); + SearchRequest request = randomSearchRequest(); int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 4831aaa0f399d..7fa202e0332ac 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -52,15 +52,15 @@ protected SearchRequest createSearchRequest() throws IOException { return super.createSearchRequest(); } //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically. - SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); + SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean()); RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder); return searchRequest; } public void testClusterAliasValidation() { - expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0)); - expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1)); - SearchRequest searchRequest = new SearchRequest("", 0); + expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0, randomBoolean())); + expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1, randomBoolean())); + SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean()); assertNull(searchRequest.validate()); } @@ -79,9 +79,11 @@ public void testClusterAliasSerialization() throws IOException { if (version.before(Version.V_6_7_0)) { assertNull(deserializedRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(deserializedRequest); + assertTrue(deserializedRequest.isFinalReduce()); } else { assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias()); assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis()); + assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce()); } } @@ -94,6 +96,7 @@ public void testReadFromPre6_7_0() throws IOException { assertArrayEquals(new String[]{"index"}, searchRequest.indices()); assertNull(searchRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(searchRequest); + assertTrue(searchRequest.isFinalReduce()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index 358a00ad5c9de..8bf95f5c7204e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -26,13 +26,17 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase { public void testLocalClusterAlias() { - long nowInMillis = System.currentTimeMillis(); + long nowInMillis = randomLongBetween(0, Long.MAX_VALUE); IndexRequest indexRequest = new IndexRequest("test", "type", "1"); indexRequest.source("field", "value"); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); @@ -40,7 +44,7 @@ public void testLocalClusterAlias() { assertEquals(RestStatus.CREATED, indexResponse.status()); { - SearchRequest searchRequest = new SearchRequest("local", nowInMillis); + SearchRequest searchRequest = new SearchRequest("local", nowInMillis, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits()); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -51,7 +55,7 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } { - SearchRequest searchRequest = new SearchRequest("", nowInMillis); + SearchRequest searchRequest = new SearchRequest("", nowInMillis, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits()); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -90,19 +94,19 @@ public void testAbsoluteStartMillis() { assertEquals(0, searchResponse.getTotalShards()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean()); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(2, searchResponse.getHits().getTotalHits()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean()); searchRequest.indices(""); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits()); assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); rangeQuery.gte("1970-01-01"); @@ -114,4 +118,46 @@ public void testAbsoluteStartMillis() { assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } } + + public void testFinalReduce() { + long nowInMillis = randomLongBetween(0, Long.MAX_VALUE); + { + IndexRequest indexRequest = new IndexRequest("test", "type", "1"); + indexRequest.source("price", 10); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + { + IndexRequest indexRequest = new IndexRequest("test", "type", "2"); + indexRequest.source("price", 100); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + client().admin().indices().prepareRefresh("test").get(); + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.size(0); + TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC); + terms.field("price"); + terms.size(1); + source.aggregation(terms); + + { + SearchRequest searchRequest = randomBoolean() ? new SearchRequest().source(source) + : new SearchRequest("remote", nowInMillis, true).source(source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits()); + Aggregations aggregations = searchResponse.getAggregations(); + LongTerms longTerms = aggregations.get("terms"); + assertEquals(1, longTerms.getBuckets().size()); + } + { + SearchRequest searchRequest = new SearchRequest("remote", nowInMillis, false).source(source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(2, searchResponse.getHits().getTotalHits()); + Aggregations aggregations = searchResponse.getAggregations(); + LongTerms longTerms = aggregations.get("terms"); + assertEquals(2, longTerms.getBuckets().size()); + } + } }