Skip to content

Commit

Permalink
SQL: Enable the InnerAggregates inside PIVOT (#65792)
Browse files Browse the repository at this point in the history
* Remove the limitation of not being able to use `InnerAggregate`
inside PIVOTs (aggregations using extended and matrix stats)
* The limitation was introduced as part of the original `PIVOT` 
implementation in #46489, but after #49693 it could be lifted.
* Test that the `PIVOT` results in the same query as the 
`GROUP BY`. This should hold across all the 
`AggregateFunction`s we have.
  • Loading branch information
Andras Palinkas authored Dec 7, 2020
1 parent 9b47889 commit 67704b0
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 95 deletions.
46 changes: 46 additions & 0 deletions x-pack/plugin/sql/qa/server/src/main/resources/pivot.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,51 @@ null |10044 |Mingsen |F |1994-05-21
// end::sumWithoutSubquery
;

sumWithInnerAggregateSumOfSquares
schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
SELECT * FROM test_emp PIVOT (SUM_OF_SQUARES(salary) FOR languages IN (1, 2)) LIMIT 5;

birth_date | emp_no | first_name | gender | hire_date | last_name | 1 | 2
---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
null |10041 |Uri |F |1989-11-12T00:00:00.000Z|Lenart |3.182652225E9 |null
null |10043 |Yishay |M |1990-10-20T00:00:00.000Z|Tzvieli |1.179304281E9 |null
null |10044 |Mingsen |F |1994-05-21T00:00:00.000Z|Casley |1.578313984E9 |null
1952-04-19T00:00:00Z |10009 |Sumant |F |1985-02-18T00:00:00.000Z|Peac |4.378998276E9 |null
1953-01-07T00:00:00Z |10067 |Claudi |M |1987-03-04T00:00:00.000Z|Stavenow |null |2.708577936E9
;

sumWithInnerAggregateSumOfSquaresRound
schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
SELECT * FROM test_emp PIVOT (ROUND(SUM_OF_SQUARES(salary)/1E6, 2) FOR languages IN (1, 2)) LIMIT 5;

birth_date | emp_no | first_name | gender | hire_date | last_name | 1 | 2
---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
null |10041 |Uri |F |1989-11-12T00:00:00.000Z|Lenart |3182.65 |null
null |10043 |Yishay |M |1990-10-20T00:00:00.000Z|Tzvieli |1179.30 |null
null |10044 |Mingsen |F |1994-05-21T00:00:00.000Z|Casley |1578.31 |null
1952-04-19T00:00:00Z |10009 |Sumant |F |1985-02-18T00:00:00.000Z|Peac |4379.00 |null
1953-01-07T00:00:00Z |10067 |Claudi |M |1987-03-04T00:00:00.000Z|Stavenow |null |2708.58
;

sumWithInnerAggregateKurtosis
schema::client_port:i|'OK':d|'Error':d
SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (KURTOSIS(bytes_in) FOR status IN ('OK', 'Error')) LIMIT 10;

client_port | 'OK' | 'Error'
---------------+------------------+---------------
null |2.0016153277578916|NaN
;

sumWithInnerAggregateKurtosisRound
schema::client_port:i|'OK':d|'Error':d
SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (ROUND(KURTOSIS(bytes_in), 3) FOR status IN ('OK', 'Error')) LIMIT 10;

client_port | 'OK' | 'Error'
---------------+------------------+---------------
null |2.002 |-0.0
;


averageWithOneValueAndMath
schema::languages:bt|'F':d
SELECT * FROM (SELECT languages, gender, salary FROM test_emp) PIVOT (ROUND(AVG(salary) / 2) FOR gender IN ('F'));
Expand All @@ -214,3 +259,4 @@ null |31070.0
4 |24646.0
5 |23353.0
;

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
package org.elasticsearch.xpack.sql.planner;

import org.elasticsearch.xpack.ql.common.Failure;
import org.elasticsearch.xpack.ql.expression.function.aggregate.InnerAggregate;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.plan.physical.PivotExec;
import org.elasticsearch.xpack.sql.plan.physical.Unexecutable;
import org.elasticsearch.xpack.sql.plan.physical.UnplannedExec;

Expand All @@ -32,22 +30,10 @@ static List<Failure> verifyMappingPlan(PhysicalPlan plan) {
}
});
});
// verify Pivot
checkInnerAggsPivot(plan, failures);

return failures;
}

private static void checkInnerAggsPivot(PhysicalPlan plan, List<Failure> failures) {
plan.forEachDown(p -> {
p.pivot().aggregates().forEach(agg -> agg.forEachDown(e -> {
if (e instanceof InnerAggregate) {
failures.add(fail(e, "Aggregation [{}] not supported (yet) by PIVOT", e.sourceText()));
}
}));
}, PivotExec.class);
}

static List<Failure> verifyExecutingPlan(PhysicalPlan plan) {
List<Failure> failures = new ArrayList<>();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.ReferenceAttribute;
import org.elasticsearch.xpack.ql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.type.EsField;
Expand All @@ -20,16 +21,19 @@
import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.sql.plan.physical.LocalExec;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.session.SingletonExecutable;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.types.SqlTypesTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -449,15 +453,15 @@ public void testSelectLiteralWithGroupBy() {
assertEquals(EsQueryExec.class, p.getClass());
EsQueryExec ee = (EsQueryExec) p;
assertEquals(2, ee.output().size());
assertEquals(Arrays.asList("1", "MAX(int)"), Expressions.names(ee.output()));
assertEquals(asList("1", "MAX(int)"), Expressions.names(ee.output()));
assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"max\":{\"field\":\"int\""));

p = plan("SELECT 1, count(*) FROM test GROUP BY int");
assertEquals(EsQueryExec.class, p.getClass());
ee = (EsQueryExec) p;
assertEquals(2, ee.output().size());
assertEquals(Arrays.asList("1", "count(*)"), Expressions.names(ee.output()));
assertEquals(asList("1", "count(*)"), Expressions.names(ee.output()));
assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"terms\":{\"field\":\"int\""));
}
Expand Down Expand Up @@ -495,14 +499,39 @@ public void testFoldingOfPivot() {
assertEquals(EsQueryExec.class, p.getClass());
EsQueryExec ee = (EsQueryExec) p;
assertEquals(3, ee.output().size());
assertEquals(Arrays.asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
assertEquals(asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
String q = ee.toString().replaceAll("\\s+", "");
assertThat(q, containsString("\"query\":{\"terms\":{\"keyword\":[\"A\",\"B\"]"));
String a = ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", "");
assertThat(a, containsString("\"terms\":{\"field\":\"bool\""));
assertThat(a, containsString("\"terms\":{\"field\":\"keyword\""));
assertThat(a, containsString("{\"avg\":{\"field\":\"int\"}"));
}

public void testPivotHasSameQueryAsGroupBy() {
final Map<String, String> aggFnsWithMultipleArguments = Map.of(
"PERCENTILE", "PERCENTILE(int, 0)",
"PERCENTILE_RANK", "PERCENTILE_RANK(int, 0)"
);
List<String> aggregations = new SqlFunctionRegistry().listFunctions()
.stream()
.filter(def -> AggregateFunction.class.isAssignableFrom(def.clazz()))
.map(def -> aggFnsWithMultipleArguments.getOrDefault(def.name(), def.name() + "(int)"))
.collect(toList());
for (String aggregationStr : aggregations) {
PhysicalPlan pivotPlan = plan("SELECT * FROM (SELECT some.dotted.field, bool, keyword, int FROM test) " +
"PIVOT(" + aggregationStr + " FOR keyword IN ('A', 'B'))");
PhysicalPlan groupByPlan = plan("SELECT some.dotted.field, bool, keyword, " + aggregationStr + " " +
"FROM test WHERE keyword IN ('A', 'B') GROUP BY some.dotted.field, bool, keyword");
assertEquals(EsQueryExec.class, pivotPlan.getClass());
assertEquals(EsQueryExec.class, groupByPlan.getClass());
QueryContainer pivotQueryContainer = ((EsQueryExec) pivotPlan).queryContainer();
QueryContainer groupByQueryContainer = ((EsQueryExec) groupByPlan).queryContainer();
assertEquals(pivotQueryContainer.query(), groupByQueryContainer.query());
assertEquals(pivotQueryContainer.aggs(), groupByQueryContainer.aggs());
assertEquals(pivotPlan.toString(), groupByPlan.toString());
}
}

private static String randomOrderByAndLimit(int noOfSelectArgs) {
return SqlTestUtils.randomOrderByAndLimit(noOfSelectArgs, random());
Expand Down

0 comments on commit 67704b0

Please sign in to comment.