Skip to content

Commit

Permalink
ESQL: Pushdown count(field) to Lucene
Browse files Browse the repository at this point in the history
Use the LuceneCountOperator also for ungrouped count(field) queries

Fix #99840
  • Loading branch information
costin committed Oct 1, 2023
1 parent dbb8b7d commit f29cf83
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,68 @@ F
M
null
;

countFieldNoGrouping
from employees | where emp_no < 10050 | stats c = count(salary);

c:l
49
;

countFieldWithRenamingNoGrouping
from employees | rename emp_no as e, salary as s | where e < 10050 | stats c = count(s);

c:l
49
;


countFieldWithAliasNoGrouping
from employees | eval s = salary | rename s as sr | eval hidden_s = sr | rename emp_no as e | where e < 10050 | stats c = count(hidden_s);

c:l
49
;

countFieldWithGrouping
from employees | rename languages as l | where emp_no < 10050 | stats c = count(emp_no) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;

countFieldWithAliasWithGrouping
from employees | rename languages as l | eval e = emp_no | where emp_no < 10050 | stats c = count(e) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;

countEvalExpNoGrouping
from employees | eval e = case(emp_no < 10050, emp_no, null) | stats c = count(e);

c:l
49
;

countEvalExpWithGrouping
from employees | rename languages as l | eval e = case(emp_no < 10050, emp_no, null) | stats c = count(e) by l | sort l;

c:l | l:i
9 | 1
7 | 2
6 | 3
9 | 4
8 | 5
10 | null
;
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.NotEquals;
Expand Down Expand Up @@ -346,7 +347,7 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {

private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate) {
AttributeMap<Stat> stats = new AttributeMap<>();
Tuple<List<Attribute>, List<Stat>> tuple = new Tuple<>(new ArrayList<Attribute>(), new ArrayList<Stat>());
Tuple<List<Attribute>, List<Stat>> tuple = new Tuple<>(new ArrayList<>(), new ArrayList<>());

if (aggregate.groupings().isEmpty()) {
for (NamedExpression agg : aggregate.aggregates()) {
Expand All @@ -356,9 +357,21 @@ private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate
Expression child = as.child();
if (child instanceof Count count) {
var target = count.field();
String fieldName = null;
QueryBuilder query = null;
// TODO: add count over field (has to be field attribute)
if (target.foldable()) {
return new Stat(StringUtils.WILDCARD, COUNT);
fieldName = StringUtils.WILDCARD;
}
// check if regular field
else {
if (target instanceof FieldAttribute fa) {
fieldName = fa.name();
query = QueryBuilders.existsQuery(fieldName);
}
}
if (fieldName != null) {
return new Stat(fieldName, COUNT, query);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.util.Queries;

import java.util.List;
import java.util.Objects;

import static java.util.Arrays.asList;

/**
* Specialized query class for retrieving statistics about the underlying data and not the actual documents.
* For that see {@link EsQueryExec}
Expand All @@ -29,10 +32,15 @@ public enum StatsType {
COUNT,
MIN,
MAX,
EXISTS;
EXISTS
}

public record Stat(String name, StatsType type) {};
public record Stat(String name, StatsType type, QueryBuilder query) {

public QueryBuilder filter(QueryBuilder sourceQuery) {
return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query));
}
}

private final EsIndex index;
private final QueryBuilder query;
Expand Down Expand Up @@ -69,6 +77,10 @@ public QueryBuilder query() {
return query;
}

public List<Stat> stats() {
return stats;
}

@Override
public List<Attribute> output() {
return attrs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
* Estimate of the number of bytes that'll be loaded per position before
* the stream of pages is consumed.
*/
private final Integer estimatedRowSize;
private final int estimatedRowSize;

public FragmentExec(LogicalPlan fragment) {
this(fragment.source(), fragment, null, null);
this(fragment.source(), fragment, null, 0);
}

public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, Integer estimatedRowSize) {
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
super(source);
this.fragment = fragment;
this.esFilter = esFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.elasticsearch.xpack.ql.util.Holder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -99,6 +98,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.compute.lucene.LuceneOperator.NO_LIMIT;
import static org.elasticsearch.compute.operator.LimitOperator.Factory;
Expand Down Expand Up @@ -239,9 +239,15 @@ private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutio
if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) {
throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend");
}
EsPhysicalOperationProviders esProvider = (EsPhysicalOperationProviders) physicalOperationProviders;
if (statsQuery.stats().size() > 1) {
throw new EsqlIllegalArgumentException("EsStatsQuery currently supports only one field statistic");
}

Function<SearchContext, Query> querySupplier = EsPhysicalOperationProviders.querySupplier(statsQuery.query());
// for now only one stat is supported
EsStatsQueryExec.Stat stat = statsQuery.stats().get(0);

EsPhysicalOperationProviders esProvider = (EsPhysicalOperationProviders) physicalOperationProviders;
Function<SearchContext, Query> querySupplier = EsPhysicalOperationProviders.querySupplier(stat.filter(statsQuery.query()));

Expression limitExp = statsQuery.limit();
int limit = limitExp != null ? (Integer) limitExp.fold() : NO_LIMIT;
Expand Down Expand Up @@ -411,8 +417,8 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
return source.with(
new TopNOperatorFactory(
limit,
Arrays.asList(elementTypes),
Arrays.asList(encoders),
asList(elementTypes),
asList(encoders),
orders,
context.pageSize(2000 + topNExec.estimatedRowSize())
),
Expand Down
Loading

0 comments on commit f29cf83

Please sign in to comment.