Skip to content

Commit

Permalink
ESQL: Pushdown count(field) to Lucene (#100122)
Browse files Browse the repository at this point in the history
Use the LuceneCountOperator also for ungrouped count(field) queries
Enhance the SearchStat class to indicate when a field is single or multi-value

Fix #99840
  • Loading branch information
costin authored Oct 3, 2023
1 parent 99f3bd3 commit 2e86d25
Show file tree
Hide file tree
Showing 15 changed files with 669 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public byte[] min(String field, DataType dataType) {
public byte[] max(String field, DataType dataType) {
return null;
}

@Override
public boolean isSingleValue(String field) {
return false;
}
}

public static final TestSearchStats TEST_SEARCH_STATS = new TestSearchStats();
Expand Down
103 changes: 103 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,106 @@ F
M
null
;

countFieldNoGrouping
from employees | where emp_no < 10050 | stats c = count(salary);

c:l
49
;

countFieldWithRenamingNoGrouping
from employees | rename emp_no as e, salary as s | where e < 10050 | stats c = count(s);

c:l
49
;


countFieldWithAliasNoGrouping
from employees | eval s = salary | rename s as sr | eval hidden_s = sr | rename emp_no as e | where e < 10050 | stats c = count(hidden_s);

c:l
49
;

countFieldWithGrouping
from employees | rename languages as l | where emp_no < 10050 | stats c = count(emp_no) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;

countFieldWithAliasWithGrouping
from employees | rename languages as l | eval e = emp_no | where emp_no < 10050 | stats c = count(e) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;

countEvalExpNoGrouping
from employees | eval e = case(emp_no < 10050, emp_no, null) | stats c = count(e);

c:l
49
;

countEvalExpWithGrouping
from employees | rename languages as l | eval e = case(emp_no < 10050, emp_no, null) | stats c = count(e) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;

countAllOnOrdinalField
from employees | stats ca = count() by gender | sort gender;

ca:l|gender:s
33 |F
57 |M
10 |null
;

countFieldOnOrdinalField
from employees | stats ca = count(gender) by gender | sort gender;

ca:l|gender:s
33 |F
57 |M
0 |null
;


countFieldVsAll
from employees | stats ca = count(), cn = count(null), cf = count(gender) by gender | sort gender;

ca:l|cn:l|cf:l|gender:s
33 |33 |33 |F
57 |57 |57 |M
10 |10 |0 |null
;

countMultiValue
from employees | where emp_no == 10010 | stats c = count(job_positions) by job_positions;

c:l | job_positions:s
4 |Architect
4 |Purchase Manager
4 |Reporting Analyst
4 |Tech Lead
;
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.stats.SearchStats;

public record LocalPhysicalOptimizerContext(EsqlConfiguration configuration) {}
public record LocalPhysicalOptimizerContext(EsqlConfiguration configuration, SearchStats searchStats) {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.NotEquals;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -260,8 +262,6 @@ private static boolean isAttributePushable(Expression expression, ScalarFunction
* this method is supposed to be used to define if a field can be used for exact push down (eg. sort or filter).
* "aggregatable" is the most accurate information we can have from field_caps as of now.
* Pushing down operations on fields that are not aggregatable would result in an error.
* @param f
* @return
*/
private static boolean isAggregatable(FieldAttribute f) {
return f.exactAttribute().field().isAggregatable();
Expand Down Expand Up @@ -320,13 +320,23 @@ private List<EsQueryExec.FieldSort> buildFieldSorts(List<Order> orders) {
/**
* Looks for the case where certain stats exist right before the query and thus can be pushed down.
*/
private static class PushStatsToSource extends OptimizerRule<AggregateExec> {
private static class PushStatsToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
AggregateExec,
LocalPhysicalOptimizerContext> {

@Override
protected PhysicalPlan rule(AggregateExec aggregateExec) {
protected PhysicalPlan rule(AggregateExec aggregateExec, LocalPhysicalOptimizerContext context) {
PhysicalPlan plan = aggregateExec;
if (aggregateExec.child() instanceof EsQueryExec queryExec) {
var tuple = pushableStats(aggregateExec);
var tuple = pushableStats(aggregateExec, context);

// for the moment support pushing count just for one field
List<Stat> stats = tuple.v2();
if (stats.size() > 1) {
if (stats.stream().map(Stat::name).collect(Collectors.toSet()).size() > 1) {
return aggregateExec;
}
}

// TODO: handle case where some aggs cannot be pushed down by breaking the aggs into two sources (regular + stats) + union
// use the stats since the attributes are larger in size (due to seen)
Expand All @@ -344,9 +354,9 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {
return plan;
}

private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate) {
private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate, LocalPhysicalOptimizerContext context) {
AttributeMap<Stat> stats = new AttributeMap<>();
Tuple<List<Attribute>, List<Stat>> tuple = new Tuple<>(new ArrayList<Attribute>(), new ArrayList<Stat>());
Tuple<List<Attribute>, List<Stat>> tuple = new Tuple<>(new ArrayList<>(), new ArrayList<>());

if (aggregate.groupings().isEmpty()) {
for (NamedExpression agg : aggregate.aggregates()) {
Expand All @@ -356,9 +366,24 @@ private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate
Expression child = as.child();
if (child instanceof Count count) {
var target = count.field();
String fieldName = null;
QueryBuilder query = null;
// TODO: add count over field (has to be field attribute)
if (target.foldable()) {
return new Stat(StringUtils.WILDCARD, COUNT);
fieldName = StringUtils.WILDCARD;
}
// check if regular field
else {
if (target instanceof FieldAttribute fa) {
var fName = fa.name();
if (context.searchStats().isSingleValue(fName)) {
fieldName = fa.name();
query = QueryBuilders.existsQuery(fieldName);
}
}
}
if (fieldName != null) {
return new Stat(fieldName, COUNT, query);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.util.Queries;

import java.util.List;
import java.util.Objects;

import static java.util.Arrays.asList;

/**
* Specialized query class for retrieving statistics about the underlying data and not the actual documents.
* For that see {@link EsQueryExec}
Expand All @@ -29,10 +32,15 @@ public enum StatsType {
COUNT,
MIN,
MAX,
EXISTS;
EXISTS
}

public record Stat(String name, StatsType type) {};
public record Stat(String name, StatsType type, QueryBuilder query) {

public QueryBuilder filter(QueryBuilder sourceQuery) {
return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query));
}
}

private final EsIndex index;
private final QueryBuilder query;
Expand Down Expand Up @@ -69,6 +77,10 @@ public QueryBuilder query() {
return query;
}

public List<Stat> stats() {
return stats;
}

@Override
public List<Attribute> output() {
return attrs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
* Estimate of the number of bytes that'll be loaded per position before
* the stream of pages is consumed.
*/
private final Integer estimatedRowSize;
private final int estimatedRowSize;

public FragmentExec(LogicalPlan fragment) {
this(fragment.source(), fragment, null, null);
this(fragment.source(), fragment, null, 0);
}

public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, Integer estimatedRowSize) {
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
super(source);
this.fragment = fragment;
this.esFilter = esFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.elasticsearch.xpack.ql.util.Holder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -99,6 +98,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.compute.lucene.LuceneOperator.NO_LIMIT;
import static org.elasticsearch.compute.operator.LimitOperator.Factory;
Expand Down Expand Up @@ -239,9 +239,15 @@ private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutio
if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) {
throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend");
}
EsPhysicalOperationProviders esProvider = (EsPhysicalOperationProviders) physicalOperationProviders;
if (statsQuery.stats().size() > 1) {
throw new EsqlIllegalArgumentException("EsStatsQuery currently supports only one field statistic");
}

Function<SearchContext, Query> querySupplier = EsPhysicalOperationProviders.querySupplier(statsQuery.query());
// for now only one stat is supported
EsStatsQueryExec.Stat stat = statsQuery.stats().get(0);

EsPhysicalOperationProviders esProvider = (EsPhysicalOperationProviders) physicalOperationProviders;
Function<SearchContext, Query> querySupplier = EsPhysicalOperationProviders.querySupplier(stat.filter(statsQuery.query()));

Expression limitExp = statsQuery.limit();
int limit = limitExp != null ? (Integer) limitExp.fold() : NO_LIMIT;
Expand Down Expand Up @@ -411,8 +417,8 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
return source.with(
new TopNOperatorFactory(
limit,
Arrays.asList(elementTypes),
Arrays.asList(encoders),
asList(elementTypes),
asList(encoders),
orders,
context.pageSize(2000 + topNExec.estimatedRowSize())
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static PhysicalPlan localPlan(List<SearchContext> searchContexts, EsqlCon

public static PhysicalPlan localPlan(EsqlConfiguration configuration, PhysicalPlan plan, SearchStats searchStats) {
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, searchStats));
var physicalOptimizer = new LocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(configuration));
var physicalOptimizer = new LocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(configuration, searchStats));

return localPlan(plan, logicalOptimizer, physicalOptimizer);
}
Expand Down
Loading

0 comments on commit 2e86d25

Please sign in to comment.