Skip to content

Commit

Permalink
Add ability to suggest shard_size on coord node rewrite
Browse files Browse the repository at this point in the history
With elastic#36997 and elastic#37000 we added the ability to provide a cluster alias with a SearchRequest and to perform non-final reduction when a cluster alias is set to the incoming search request.

With CCS soon supporting local reduction on each remote cluster, we also need to prevent shard_size of terms aggs to be adjusted on the data nodes. We may end up querying a single shard per cluster, but later we will have to reduce results coming from multiple cluster hence we still have to collect more buckets than needed to guarantee some level of precision.

This is done by adding the ability to override the shard_size on the coordinating node as part of the rewrite phase. This will be done only
when searching against multiple clusters, meaning when using CCS with alternate execution mode. Setting the shard_size explicitly on the coord node means that it will not be overridden later on the data nodes. The shard_size will still be set on the data nodes like before in all other cases (local search, or CCS with ordinary execution mode).

 Relates to elastic#32125
  • Loading branch information
javanna committed Dec 28, 2018
1 parent cb6bac3 commit 705f28c
Show file tree
Hide file tree
Showing 25 changed files with 183 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -108,8 +109,8 @@ protected void doExecute(Task task, ValidateQueryRequest request, ActionListener
if (request.query() == null) {
rewriteListener.onResponse(request.query());
} else {
Rewriteable.rewriteAndFetch(request.query(), searchService.getRewriteContext(timeProvider),
rewriteListener);
QueryRewriteContext rewriteContext = searchService.getRewriteContext(timeProvider, false);
Rewriteable.rewriteAndFetch(request.query(), rewriteContext, rewriteListener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -214,8 +215,8 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
QueryRewriteContext rewriteContext = searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis, false);
Rewriteable.rewriteAndFetch(searchRequest.source(), rewriteContext, rewriteListener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@
public class QueryRewriteContext {
private final NamedXContentRegistry xContentRegistry;
private final NamedWriteableRegistry writeableRegistry;
private final boolean multipleClusters;
protected final Client client;
protected final LongSupplier nowInMillis;
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();

public QueryRewriteContext(
NamedXContentRegistry xContentRegistry, NamedWriteableRegistry writeableRegistry,Client client,
LongSupplier nowInMillis) {

public QueryRewriteContext(NamedXContentRegistry xContentRegistry, NamedWriteableRegistry writeableRegistry,
Client client, LongSupplier nowInMillis, boolean multipleClusters) {
this.xContentRegistry = xContentRegistry;
this.writeableRegistry = writeableRegistry;
this.client = client;
this.nowInMillis = nowInMillis;
this.multipleClusters = multipleClusters;
}

/**
Expand All @@ -68,6 +68,14 @@ public NamedWriteableRegistry getWriteableRegistry() {
return writeableRegistry;
}

/**
* Returns whether the request being rewritten is part of a cross-cluster search request where each cluster performs
* its own local reduction, and the targeted clusters are more than one.
*/
public boolean isMultipleClusters() {
return multipleClusters;
}

/**
* Returns an instance of {@link QueryShardContext} if available of null otherwise
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public QueryShardContext(int shardId, IndexSettings indexSettings, BitsetFilterC
SimilarityService similarityService, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
NamedWriteableRegistry namedWriteableRegistry, Client client, IndexReader reader, LongSupplier nowInMillis,
String clusterAlias) {
super(xContentRegistry, namedWriteableRegistry,client, nowInMillis);
super(xContentRegistry, namedWriteableRegistry,client, nowInMillis, false);
this.shardId = shardId;
this.similarityService = similarityService;
this.mapperService = mapperService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,8 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, String...
/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return new QueryRewriteContext(xContentRegistry, namedWriteableRegistry, client, nowInMillis);
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, boolean multipleClusters) {
return new QueryRewriteContext(xContentRegistry, namedWriteableRegistry, client, nowInMillis, multipleClusters);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,14 +1097,14 @@ protected void doRun() {
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
// adding a lot of overhead
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
Rewriteable.rewriteAndFetch(request.getRewriteable(), getRewriteContext(request::nowInMillis, false), actionListener);
}

/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return indicesService.getRewriteContext(nowInMillis);
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, boolean multipleClusters) {
return indicesService.getRewriteContext(nowInMillis, multipleClusters);
}

public IndicesService getIndicesService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final AggregationBuilder rewrite(QueryRewriteContext context) throws IOEx
* identity reference must be returned otherwise the builder will be
* rewritten infinitely.
*/
protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
protected AggregationBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}

@Override
protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
QueryBuilder result = Rewriteable.rewrite(filter, queryShardContext);
protected AggregationBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
QueryBuilder result = Rewriteable.rewrite(filter, queryRewriteContext);
if (result != filter) {
return new FilterAggregationBuilder(getName(), result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ public String otherBucketKey() {
}

@Override
protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
protected AggregationBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
List<KeyedFilter> rewrittenFilters = new ArrayList<>(filters.size());
boolean changed = false;
for (KeyedFilter kf : filters) {
QueryBuilder result = Rewriteable.rewrite(kf.filter(), queryShardContext);
QueryBuilder result = Rewriteable.rewrite(kf.filter(), queryRewriteContext);
rewrittenFilters.add(new KeyedFilter(kf.key(), result));
if (result != kf.filter()) {
changed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand Down Expand Up @@ -90,7 +91,7 @@ protected GeoGridAggregationBuilder(GeoGridAggregationBuilder clone, Builder fac
}

@Override
protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
protected GeoGridAggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
return new GeoGridAggregationBuilder(this, factoriesBuilder, metaData);
}

Expand Down Expand Up @@ -150,26 +151,41 @@ public int shardSize() {
protected ValuesSourceAggregatorFactory<ValuesSource.GeoPoint, ?> innerBuild(SearchContext context,
ValuesSourceConfig<ValuesSource.GeoPoint> config, AggregatorFactory<?> parent, Builder subFactoriesBuilder)
throws IOException {
int shardSize = this.shardSize;

int requiredSize = this.requiredSize;
int shardSize = suggestShardSize(name, this.shardSize, requiredSize, context.numberOfShards() == 1);
return new GeoHashGridAggregatorFactory(name, config, precision, requiredSize, shardSize, context, parent,
subFactoriesBuilder, metaData);
}

static int suggestShardSize(String name, int shardSize, int requiredSize, boolean singleShard) {
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
// Use default heuristic to avoid any wrong-ranking caused by distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, singleShard);
}

if (requiredSize <= 0 || shardSize <= 0) {
throw new ElasticsearchException(
"parameters [required_size] and [shard_size] must be >0 in geohash_grid aggregation [" + name + "].");
"parameters [required_size] and [shard_size] must be >0 in geohash_grid aggregation [" + name + "].");
}

if (shardSize < requiredSize) {
shardSize = requiredSize;
}
return new GeoHashGridAggregatorFactory(name, config, precision, requiredSize, shardSize, context, parent,
subFactoriesBuilder, metaData);
return shardSize;
}

@Override
protected AggregationBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
if (queryRewriteContext.isMultipleClusters()) {
assert queryRewriteContext.convertToShardContext() == null;
// We are coordinating a cross-cluster search request across more than one cluster with local reduction on each remote cluster.
// We need to set shard_size explicitly to make sure that we don't optimize later for a single shard; although we may end up
// searching on a single shard per cluster, we will reduce buckets coming from multiple shards as we have multiple clusters.
int updatedShardSize = suggestShardSize(name, shardSize, requiredSize, false);
GeoGridAggregationBuilder aggregationBuilder = shallowCopy(factoriesBuilder, metaData);
aggregationBuilder.shardSize(updatedShardSize);
return aggregationBuilder;
}
return super.doRewrite(queryRewriteContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseFieldRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.JLHScore;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
Expand Down Expand Up @@ -100,12 +101,8 @@ public static Aggregator.Parser getParser(ParseFieldRegistry<SignificanceHeurist
},
new ParseField(name));
}
return new Aggregator.Parser() {
@Override
public AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return aggregationParser.parse(parser, new SignificantTermsAggregationBuilder(aggregationName, null), null);
}
};
return (aggregationName, parser) -> aggregationParser.parse(parser,
new SignificantTermsAggregationBuilder(aggregationName, null), null);
}

private IncludeExclude includeExclude = null;
Expand Down Expand Up @@ -141,7 +138,7 @@ protected SignificantTermsAggregationBuilder(SignificantTermsAggregationBuilder
}

@Override
protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
protected SignificantTermsAggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
return new SignificantTermsAggregationBuilder(this, factoriesBuilder, metaData);
}

Expand Down Expand Up @@ -175,6 +172,38 @@ public SignificantTermsAggregationBuilder bucketCountThresholds(TermsAggregator.
return this;
}

@Override
protected AggregationBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
if (queryRewriteContext.isMultipleClusters()) {
assert queryRewriteContext.convertToShardContext() == null;
// We are coordinating a cross-cluster search request across more than one cluster with local reduction on each remote cluster.
// We need to set shard_size explicitly to make sure that we don't optimize later for a single shard; although we may end up
// searching on a single shard per cluster, we will reduce buckets coming from multiple shards as we have multiple clusters.
BucketCountThresholds bucketCountThresholds = suggestShardSize(
DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize(), this.bucketCountThresholds, false);
if (bucketCountThresholds != this.bucketCountThresholds) {
SignificantTermsAggregationBuilder aggregationBuilder = shallowCopy(factoriesBuilder, metaData);
aggregationBuilder.bucketCountThresholds = bucketCountThresholds;
return aggregationBuilder;
}
}
return super.doRewrite(queryRewriteContext);
}

static BucketCountThresholds suggestShardSize(int defaultShardSize, BucketCountThresholds bucketCountThresholds, boolean singleShard) {
// The user has not made a shardSize selection. Use default heuristic to avoid any wrong-ranking caused by distributed counting
// but request double the usual amount. We typically need more than the number of "top" terms requested by other aggregations
// as the significance algorithm is in less of a position to down-select at shard-level - some of the things we want to find have
// only one occurrence on each shard and as such are impossible to differentiate from non-significant terms at that early stage.
if (bucketCountThresholds.getShardSize() == defaultShardSize) {
int newShardSize = BucketUtils.suggestShardSideQueueSize(2 * bucketCountThresholds.getRequiredSize(), singleShard);
BucketCountThresholds updatedBucketCountThresholds = new BucketCountThresholds(bucketCountThresholds);
updatedBucketCountThresholds.setShardSize(newShardSize);
return updatedBucketCountThresholds;
}
return bucketCountThresholds;
}

/**
* Sets the size - indicating how many term buckets should be returned
* (defaults to 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
Expand Down Expand Up @@ -181,36 +180,18 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
}

numberOfAggregatorsCreated++;
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
if (bucketCountThresholds.getShardSize() == SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
// The user has not made a shardSize selection .
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
// but request double the usual amount.
// We typically need more than the number of "top" terms requested
// by other aggregations
// as the significance algorithm is in less of a position to
// down-select at shard-level -
// some of the things we want to find have only one occurrence on
// each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
}

BucketCountThresholds bucketCountThresholds = SignificantTermsAggregationBuilder.suggestShardSize(
SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize(),
this.bucketCountThresholds, context.numberOfShards() == 1);
if (valuesSource instanceof ValuesSource.Bytes) {
ExecutionMode execution = null;
final ExecutionMode execution;
if (executionHint != null) {
execution = ExecutionMode.fromString(executionHint, deprecationLogger);
}
if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals == false) {
} else if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals == false) {
execution = ExecutionMode.MAP;
}
if (execution == null) {
} else {
execution = ExecutionMode.GLOBAL_ORDINALS;
}
assert execution != null;

DocValueFormat format = config.format();
if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) {
Expand Down
Loading

0 comments on commit 705f28c

Please sign in to comment.