Skip to content

Commit

Permalink
SQL: Allow sorting of groups by aggregates
Browse files Browse the repository at this point in the history
Introduce client-side, custom sorting of groups based on aggregate
functions. To allow this, the Analyzer has been extended to push down
to underlying Aggregate, aggregate function and the Querier has been
extended to identify the case and consume the results in order and sort
them based on the given columns.
The underlying QueryContainer has been slightly modified to allow a view
of the underlying values being extracted as the columns used for sorting
might not be requested by the user.

The PR also adds minor tweaks, mainly related to tree output.

Close elastic#35118
  • Loading branch information
costin committed Jan 30, 2019
1 parent ecbaa38 commit e694c10
Show file tree
Hide file tree
Showing 55 changed files with 1,217 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void testExplainBasic() throws IOException {
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("With[{}]"));
assertThat(readLine(), startsWith("\\_Project[[?*]]"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]"));
assertEquals("", readLine());

assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test"), containsString("plan"));
Expand Down Expand Up @@ -64,22 +64,22 @@ public void testExplainWithWhere() throws IOException {
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("With[{}]"));
assertThat(readLine(), startsWith("\\_Project[[?*]]"));
assertThat(readLine(), startsWith(" \\_Filter[i = 2#"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]"));
assertThat(readLine(), startsWith(" \\_Filter[Equals[?i"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]"));
assertEquals("", readLine());

assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT * FROM test WHERE i = 2"),
containsString("plan"));
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("Project[[i{f}#"));
assertThat(readLine(), startsWith("\\_Filter[i = 2#"));
assertThat(readLine(), startsWith("\\_Filter[Equals[i"));
assertThat(readLine(), startsWith(" \\_EsRelation[test][i{f}#"));
assertEquals("", readLine());

assertThat(command("EXPLAIN (PLAN OPTIMIZED) SELECT * FROM test WHERE i = 2"), containsString("plan"));
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("Project[[i{f}#"));
assertThat(readLine(), startsWith("\\_Filter[i = 2#"));
assertThat(readLine(), startsWith("\\_Filter[Equals[i"));
assertThat(readLine(), startsWith(" \\_EsRelation[test][i{f}#"));
assertEquals("", readLine());

Expand Down Expand Up @@ -123,20 +123,20 @@ public void testExplainWithCount() throws IOException {
assertThat(command("EXPLAIN (PLAN PARSED) SELECT COUNT(*) FROM test"), containsString("plan"));
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("With[{}]"));
assertThat(readLine(), startsWith("\\_Project[[?COUNT(*)]]"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[[][index=test],null,Unknown index [test]]"));
assertThat(readLine(), startsWith("\\_Project[[?COUNT[?*]]]"));
assertThat(readLine(), startsWith(" \\_UnresolvedRelation[test]"));
assertEquals("", readLine());

assertThat(command("EXPLAIN " + (randomBoolean() ? "" : "(PLAN ANALYZED) ") + "SELECT COUNT(*) FROM test"),
containsString("plan"));
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("Aggregate[[],[COUNT(*)#"));
assertThat(readLine(), startsWith("Aggregate[[],[Count[*=1"));
assertThat(readLine(), startsWith("\\_EsRelation[test][i{f}#"));
assertEquals("", readLine());

assertThat(command("EXPLAIN (PLAN OPTIMIZED) SELECT COUNT(*) FROM test"), containsString("plan"));
assertThat(readLine(), startsWith("----------"));
assertThat(readLine(), startsWith("Aggregate[[],[COUNT(*)#"));
assertThat(readLine(), startsWith("Aggregate[[],[Count[*=1"));
assertThat(readLine(), startsWith("\\_EsRelation[test][i{f}#"));
assertEquals("", readLine());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static List<Object[]> readScriptSpec() throws Exception {
tests.addAll(readScriptSpec("/datetime.sql-spec", parser));
tests.addAll(readScriptSpec("/math.sql-spec", parser));
tests.addAll(readScriptSpec("/agg.sql-spec", parser));
tests.addAll(readScriptSpec("/agg-ordering.sql-spec", parser));
tests.addAll(readScriptSpec("/arithmetic.sql-spec", parser));
tests.addAll(readScriptSpec("/string-functions.sql-spec", parser));
tests.addAll(readScriptSpec("/case-functions.sql-spec", parser));
Expand Down
51 changes: 51 additions & 0 deletions x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//
// Custom sorting/ordering on aggregates
//

countWithImplicitGroupBy
SELECT MAX(salary) AS m FROM test_emp ORDER BY COUNT(*);

countWithImplicitGroupByWithHaving
SELECT MAX(salary) AS m FROM test_emp HAVING MIN(salary) > 1 ORDER BY COUNT(*);

countAndMaxWithImplicitGroupBy
SELECT MAX(salary) AS m FROM test_emp ORDER BY MAX(salary), COUNT(*);

maxWithAliasWithImplicitGroupBy
SELECT MAX(salary) AS m FROM test_emp ORDER BY m;

maxWithAliasWithImplicitGroupByAndHaving
SELECT MAX(salary) AS m FROM test_emp HAVING COUNT(*) > 1 ORDER BY m;

aggWithoutAlias
SELECT MAX(salary) AS max FROM test_emp GROUP BY gender ORDER BY MAX(salary);

aggWithAlias
SELECT MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY m;

multipleAggsThatGetRewrittenWithoutAlias
SELECT MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY gender ORDER BY MAX(salary);

multipleAggsThatGetRewrittenWithAlias
SELECT MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY gender ORDER BY max;

aggNotSpecifiedInTheAggregate
SELECT MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender ORDER BY MAX(salary);

aggNotSpecifiedInTheAggregateWithHaving
SELECT MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY MAX(salary);

multipleAggsThatGetRewrittenWithAliasOnAMediumGroupBy
SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages ORDER BY max;

multipleAggsThatGetRewrittenWithAliasOnALargeGroupBy
SELECT emp_no, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY emp_no ORDER BY max;

multipleAggsThatGetRewrittenWithAliasOnAMediumGroupByWithHaving
SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages HAVING min BETWEEN 1000 AND 99999 ORDER BY max;

aggNotSpecifiedInTheAggregatemultipleAggsThatGetRewrittenWithAliasOnALargeGroupBy
SELECT emp_no, MIN(salary) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(salary);

aggNotSpecifiedWithHavingOnLargeGroupBy
SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary);
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.type.InvalidMappedField;
import org.elasticsearch.xpack.sql.type.UnsupportedEsField;
import org.elasticsearch.xpack.sql.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -62,6 +63,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -106,7 +108,8 @@ protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
new ResolveFunctions(),
new ResolveAliases(),
new ProjectedAggregations(),
new ResolveAggsInHaving()
new ResolveAggsInHaving(),
new ResolveAggsInOrderBy()
//new ImplicitCasting()
);
Batch finish = new Batch("Finish Analysis",
Expand Down Expand Up @@ -926,62 +929,57 @@ protected LogicalPlan rule(Project p) {
// Handle aggs in HAVING. To help folding any aggs not found in Aggregation
// will be pushed down to the Aggregate and then projected. This also simplifies the Verifier's job.
//
private class ResolveAggsInHaving extends AnalyzeRule<LogicalPlan> {
private class ResolveAggsInHaving extends AnalyzeRule<Filter> {

@Override
protected boolean skipResolved() {
return false;
}

@Override
protected LogicalPlan rule(LogicalPlan plan) {
protected LogicalPlan rule(Filter f) {
// HAVING = Filter followed by an Agg
if (plan instanceof Filter) {
Filter f = (Filter) plan;
if (f.child() instanceof Aggregate && f.child().resolved()) {
Aggregate agg = (Aggregate) f.child();
if (f.child() instanceof Aggregate && f.child().resolved()) {
Aggregate agg = (Aggregate) f.child();

Set<NamedExpression> missing = null;
Expression condition = f.condition();
Set<NamedExpression> missing = null;
Expression condition = f.condition();

// the condition might contain an agg (AVG(salary)) that could have been resolved
// (salary cannot be pushed down to Aggregate since there's no grouping and thus the function wasn't resolved either)
// the condition might contain an agg (AVG(salary)) that could have been resolved
// (salary cannot be pushed down to Aggregate since there's no grouping and thus the function wasn't resolved either)

// so try resolving the condition in one go through a 'dummy' aggregate
if (!condition.resolved()) {
// that's why try to resolve the condition
Aggregate tryResolvingCondition = new Aggregate(agg.source(), agg.child(), agg.groupings(),
combine(agg.aggregates(), new Alias(f.source(), ".having", condition)));
// so try resolving the condition in one go through a 'dummy' aggregate
if (!condition.resolved()) {
// that's why try to resolve the condition
Aggregate tryResolvingCondition = new Aggregate(agg.source(), agg.child(), agg.groupings(),
combine(agg.aggregates(), new Alias(f.source(), ".having", condition)));

tryResolvingCondition = (Aggregate) analyze(tryResolvingCondition, false);
tryResolvingCondition = (Aggregate) analyze(tryResolvingCondition, false);

// if it got resolved
if (tryResolvingCondition.resolved()) {
// replace the condition with the resolved one
condition = ((Alias) tryResolvingCondition.aggregates()
.get(tryResolvingCondition.aggregates().size() - 1)).child();
} else {
// else bail out
return plan;
}
// if it got resolved
if (tryResolvingCondition.resolved()) {
// replace the condition with the resolved one
condition = ((Alias) tryResolvingCondition.aggregates()
.get(tryResolvingCondition.aggregates().size() - 1)).child();
} else {
// else bail out
return f;
}
}

missing = findMissingAggregate(agg, condition);

if (!missing.isEmpty()) {
Aggregate newAgg = new Aggregate(agg.source(), agg.child(), agg.groupings(),
combine(agg.aggregates(), missing));
Filter newFilter = new Filter(f.source(), newAgg, condition);
// preserve old output
return new Project(f.source(), newFilter, f.output());
}
missing = findMissingAggregate(agg, condition);

return new Filter(f.source(), f.child(), condition);
if (!missing.isEmpty()) {
Aggregate newAgg = new Aggregate(agg.source(), agg.child(), agg.groupings(),
combine(agg.aggregates(), missing));
Filter newFilter = new Filter(f.source(), newAgg, condition);
// preserve old output
return new Project(f.source(), newFilter, f.output());
}
return plan;
}

return plan;
return new Filter(f.source(), f.child(), condition);
}
return f;
}

private Set<NamedExpression> findMissingAggregate(Aggregate target, Expression from) {
Expand All @@ -1001,6 +999,67 @@ private Set<NamedExpression> findMissingAggregate(Aggregate target, Expression f
}
}


//
// Handle aggs in ORDER BY. To help folding any aggs not found in Aggregation
// will be pushed down to the Aggregate and then projected. This also simplifies the Verifier's job.
// Similar to Having however using a different matching pattern since HAVING is always Filter with Agg,
// while an OrderBy can have multiple intermediate nodes (Filter,Project, etc...)
//
private static class ResolveAggsInOrderBy extends AnalyzeRule<OrderBy> {

@Override
protected boolean skipResolved() {
return false;
}

@Override
protected LogicalPlan rule(OrderBy ob) {
List<Order> orders = ob.order();

// 1. collect aggs inside an order by
List<NamedExpression> aggs = new ArrayList<>();
for (Order order : orders) {
if (Functions.isAggregate(order.child())) {
aggs.add(Expressions.wrapAsNamed(order.child()));
}
}
if (aggs.isEmpty()) {
return ob;
}

// 2. find first Aggregate child and update it

final AtomicBoolean found = new AtomicBoolean(false);

LogicalPlan plan = ob.transformDown(a -> {
if (found.get() == false) {
found.set(true);

List<NamedExpression> missing = new ArrayList<>();

for (NamedExpression orderedAgg : aggs) {
if (Expressions.anyMatch(a.aggregates(), e -> Expressions.equalsAsAttribute(e, orderedAgg)) == false) {
missing.add(orderedAgg);
}
}
// agg already contains all aggs
if (missing.isEmpty() == false) {
// save aggregates
return new Aggregate(a.source(), a.child(), a.groupings(), CollectionUtils.combine(a.aggregates(), missing));
}
}
return a;
}, Aggregate.class);

// if the plan was updated, project the initial aggregates
if (plan != ob) {
return new Project(ob.source(), plan, ob.output());
}
return ob;
}
}

private class PruneDuplicateFunctions extends AnalyzeRule<LogicalPlan> {

@Override
Expand Down
Loading

0 comments on commit e694c10

Please sign in to comment.