Skip to content

Commit

Permalink
Add finalReduce flag to SearchRequest
Browse files Browse the repository at this point in the history
This commit adds support for a separate finalReduce flag to
SearchRequest and makes use of it in SearchPhaseController instead of
disabling final reduction whenever a cluster alias is provided

Relates to elastic#38104
  • Loading branch information
javanna committed Feb 1, 2019
1 parent b10e95f commit ec98df8
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -712,19 +712,19 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final boolean trackTotalHits = source == null || source.trackTotalHits();
final boolean finalReduce = request.getLocalClusterAlias() == null;

if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce);
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
request.isFinalReduce());
}
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, request.isFinalReduce());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private String localClusterAlias;
private long absoluteStartMillis;
private boolean finalReduce;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -102,6 +103,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
}

/**
Expand All @@ -123,6 +125,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
this.finalReduce = searchRequest.finalReduce;
}

/**
Expand All @@ -147,16 +150,18 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {

/**
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
* milliseconds from the epoch time and whether the reduction should be final or not. Used when a {@link SearchRequest} is created
* and executed as part of a cross-cluster search request performing reduction on each remote cluster. The coordinating CCS node
* provides the alias to prefix index names with in the returned search results, the current time to be used on the remote clusters
* to ensure that the same value is used, and determines whether the reduction phase should be final or not.
*/
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
SearchRequest(String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
}

@Override
Expand Down Expand Up @@ -195,10 +200,17 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns whether the reduction phase that will be performed needs to be final or not.
*/
boolean isFinalReduce() {
return finalReduce;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
* request. When created through {@link #SearchRequest(String, long, boolean)}, this method returns the provided current time, otherwise
* it will return {@link System#currentTimeMillis()}.
*
*/
Expand Down Expand Up @@ -518,12 +530,15 @@ public void readFrom(StreamInput in) throws IOException {
localClusterAlias = in.readOptionalString();
if (localClusterAlias != null) {
absoluteStartMillis = in.readVLong();
finalReduce = in.readBoolean();
} else {
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
finalReduce = true;
}
} else {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
finalReduce = true;
}
}

Expand Down Expand Up @@ -554,6 +569,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(localClusterAlias);
if (localClusterAlias != null) {
out.writeVLong(absoluteStartMillis);
out.writeBoolean(finalReduce);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,13 @@ private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards,
return fetchResults;
}

private static SearchRequest randomSearchRequest() {
return randomBoolean() ? new SearchRequest() : new SearchRequest("remote", 0, randomBoolean());
}

public void testConsumer() {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
Expand Down Expand Up @@ -366,7 +370,7 @@ public void testConsumerConcurrently() throws InterruptedException {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);

SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -410,7 +414,7 @@ public void testConsumerConcurrently() throws InterruptedException {
public void testConsumerOnlyAggs() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -444,7 +448,7 @@ public void testConsumerOnlyAggs() {
public void testConsumerOnlyHits() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
if (randomBoolean()) {
request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
}
Expand Down Expand Up @@ -475,8 +479,7 @@ public void testConsumerOnlyHits() {

private void assertFinalReduction(SearchRequest searchRequest) {
assertThat(reductions.size(), greaterThanOrEqualTo(1));
//the last reduction step was the final one only if no cluster alias was provided with the search request
assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1));
assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1));
}

public void testNewSearchPhaseResults() {
Expand Down Expand Up @@ -548,7 +551,7 @@ public void testReduceTopNWithFromOffset() {
public void testConsumerSortByField() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
int size = randomIntBetween(1, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -583,7 +586,7 @@ public void testConsumerSortByField() {
public void testConsumerFieldCollapsing() {
int expectedNumResults = randomIntBetween(30, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
int size = randomIntBetween(5, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ protected SearchRequest createSearchRequest() throws IOException {
return super.createSearchRequest();
}
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}

public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0, randomBoolean()));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1, randomBoolean()));
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
assertNull(searchRequest.validate());
}

Expand All @@ -79,9 +79,11 @@ public void testClusterAliasSerialization() throws IOException {
if (version.before(Version.V_6_7_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
assertTrue(deserializedRequest.isFinalReduce());
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce());
}
}

Expand All @@ -94,6 +96,7 @@ public void testReadFromPre6_7_0() throws IOException {
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
assertTrue(searchRequest.isFinalReduce());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;

public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {

public void testLocalClusterAlias() {
long nowInMillis = System.currentTimeMillis();
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
indexRequest.source("field", "value");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());

{
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchRequest searchRequest = new SearchRequest("local", nowInMillis, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -51,7 +56,7 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchRequest searchRequest = new SearchRequest("", nowInMillis, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
SearchHit[] hits = searchResponse.getHits().getHits();
Expand Down Expand Up @@ -90,19 +95,19 @@ public void testAbsoluteStartMillis() {
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");
Expand All @@ -114,4 +119,46 @@ public void testAbsoluteStartMillis() {
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
}

public void testFinalReduce() {
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
{
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
indexRequest.source("price", 10);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
IndexRequest indexRequest = new IndexRequest("test", "type", "2");
indexRequest.source("price", 100);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
client().admin().indices().prepareRefresh("test").get();

SearchSourceBuilder source = new SearchSourceBuilder();
source.size(0);
TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC);
terms.field("price");
terms.size(1);
source.aggregation(terms);

{
SearchRequest searchRequest = randomBoolean() ? new SearchRequest().source(source)
: new SearchRequest("remote", nowInMillis, true).source(source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
Aggregations aggregations = searchResponse.getAggregations();
LongTerms longTerms = aggregations.get("terms");
assertEquals(1, longTerms.getBuckets().size());
}
{
SearchRequest searchRequest = new SearchRequest("remote", nowInMillis, false).source(source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
Aggregations aggregations = searchResponse.getAggregations();
LongTerms longTerms = aggregations.get("terms");
assertEquals(2, longTerms.getBuckets().size());
}
}
}

0 comments on commit ec98df8

Please sign in to comment.