Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: Enable the InnerAggregates inside PIVOT #65792

Merged
merged 6 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions x-pack/plugin/sql/qa/server/src/main/resources/pivot.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,51 @@ null |10044 |Mingsen |F |1994-05-21
// end::sumWithoutSubquery
;

sumWithInnerAggregateSumOfSquares
schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
SELECT * FROM test_emp PIVOT (SUM_OF_SQUARES(salary) FOR languages IN (1, 2)) LIMIT 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a test with multiple arguments that get folded into compound aggs, such as sum / min / max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean multiple aggregations inside the PIVOT? Currently there is a check that we can only have a single aggregation inside the PIVOT, so we can't have PIVOT(MIN(salary), MAX(salary) FOR ...).

This change only lifts the limitation on the type of that single aggregation within the PIVOT, so aggregations that are translated into an InnerAggregate will be allowed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any validation or check for this (now that PostOptimizerTest has been removed)? this goes beyond the scope of this ticket so please raise a ticket if this functionality is missing or needs to be further investigated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PostOptimizerTests checked that the type of the aggregation cannot be one that results in an InnerAggregate vs the number of the aggregations inside the PIVOT.

There is already a check in the LogicalPlanBuilder that only allows one aggregation inside the PIVOT:

throw new ParsingException(source(pivotClause.aggs), "PIVOT currently supports only one aggregation, found [{}]",

I cannot see it tested in unit tests, but it is there and works. Added: #65894

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query fails for me in a REST test (using a REST client): it's only returning 3 rows, whereas it should have returned 5. We need to look into why the test passes and/or why the REST call with the same query fails. @palesz please, open an issue for this investigation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JDBC integration tests fetch all the rows and follow the cursor (hence 5 rows shows up there). The tests are valid and successful, no issue there. Created #65982 to investigate this unexpected behaviour related to PIVOT+LIMIT.


birth_date | emp_no | first_name | gender | hire_date | last_name | 1 | 2
---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
null |10041 |Uri |F |1989-11-12T00:00:00.000Z|Lenart |3.182652225E9 |null
null |10043 |Yishay |M |1990-10-20T00:00:00.000Z|Tzvieli |1.179304281E9 |null
null |10044 |Mingsen |F |1994-05-21T00:00:00.000Z|Casley |1.578313984E9 |null
1952-04-19T00:00:00Z |10009 |Sumant |F |1985-02-18T00:00:00.000Z|Peac |4.378998276E9 |null
1953-01-07T00:00:00Z |10067 |Claudi |M |1987-03-04T00:00:00.000Z|Stavenow |null |2.708577936E9
;

sumWithInnerAggregateSumOfSquaresRound
schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
SELECT * FROM test_emp PIVOT (ROUND(SUM_OF_SQUARES(salary)/1E6, 2) FOR languages IN (1, 2)) LIMIT 5;

birth_date | emp_no | first_name | gender | hire_date | last_name | 1 | 2
---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
null |10041 |Uri |F |1989-11-12T00:00:00.000Z|Lenart |3182.65 |null
null |10043 |Yishay |M |1990-10-20T00:00:00.000Z|Tzvieli |1179.30 |null
null |10044 |Mingsen |F |1994-05-21T00:00:00.000Z|Casley |1578.31 |null
1952-04-19T00:00:00Z |10009 |Sumant |F |1985-02-18T00:00:00.000Z|Peac |4379.00 |null
1953-01-07T00:00:00Z |10067 |Claudi |M |1987-03-04T00:00:00.000Z|Stavenow |null |2708.58
;

sumWithInnerAggregateKurtosis
schema::client_port:i|'OK':d|'Error':d
SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (KURTOSIS(bytes_in) FOR status IN ('OK', 'Error')) LIMIT 10;

client_port | 'OK' | 'Error'
---------------+------------------+---------------
null |2.0016153277578916|NaN
;

sumWithInnerAggregateKurtosisRound
schema::client_port:i|'OK':d|'Error':d
SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (ROUND(KURTOSIS(bytes_in), 3) FOR status IN ('OK', 'Error')) LIMIT 10;

client_port | 'OK' | 'Error'
---------------+------------------+---------------
null |2.002 |-0.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #65863

;


averageWithOneValueAndMath
schema::languages:bt|'F':d
SELECT * FROM (SELECT languages, gender, salary FROM test_emp) PIVOT (ROUND(AVG(salary) / 2) FOR gender IN ('F'));
Expand All @@ -214,3 +259,4 @@ null |31070.0
4 |24646.0
5 |23353.0
;

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
package org.elasticsearch.xpack.sql.planner;

import org.elasticsearch.xpack.ql.common.Failure;
import org.elasticsearch.xpack.ql.expression.function.aggregate.InnerAggregate;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.plan.physical.PivotExec;
import org.elasticsearch.xpack.sql.plan.physical.Unexecutable;
import org.elasticsearch.xpack.sql.plan.physical.UnplannedExec;

Expand All @@ -32,22 +30,10 @@ static List<Failure> verifyMappingPlan(PhysicalPlan plan) {
}
});
});
// verify Pivot
checkInnerAggsPivot(plan, failures);

return failures;
}

private static void checkInnerAggsPivot(PhysicalPlan plan, List<Failure> failures) {
plan.forEachDown(p -> {
p.pivot().aggregates().forEach(agg -> agg.forEachDown(e -> {
if (e instanceof InnerAggregate) {
failures.add(fail(e, "Aggregation [{}] not supported (yet) by PIVOT", e.sourceText()));
}
}));
}, PivotExec.class);
}

static List<Failure> verifyExecutingPlan(PhysicalPlan plan) {
List<Failure> failures = new ArrayList<>();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.sql.plan.physical.LocalExec;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.session.SingletonExecutable;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.types.SqlTypesTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -449,15 +451,15 @@ public void testSelectLiteralWithGroupBy() {
assertEquals(EsQueryExec.class, p.getClass());
EsQueryExec ee = (EsQueryExec) p;
assertEquals(2, ee.output().size());
assertEquals(Arrays.asList("1", "MAX(int)"), Expressions.names(ee.output()));
assertEquals(asList("1", "MAX(int)"), Expressions.names(ee.output()));
assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"max\":{\"field\":\"int\""));

p = plan("SELECT 1, count(*) FROM test GROUP BY int");
assertEquals(EsQueryExec.class, p.getClass());
ee = (EsQueryExec) p;
assertEquals(2, ee.output().size());
assertEquals(Arrays.asList("1", "count(*)"), Expressions.names(ee.output()));
assertEquals(asList("1", "count(*)"), Expressions.names(ee.output()));
assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"terms\":{\"field\":\"int\""));
}
Expand Down Expand Up @@ -495,14 +497,34 @@ public void testFoldingOfPivot() {
assertEquals(EsQueryExec.class, p.getClass());
EsQueryExec ee = (EsQueryExec) p;
assertEquals(3, ee.output().size());
assertEquals(Arrays.asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
assertEquals(asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
String q = ee.toString().replaceAll("\\s+", "");
assertThat(q, containsString("\"query\":{\"terms\":{\"keyword\":[\"A\",\"B\"]"));
String a = ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", "");
assertThat(a, containsString("\"terms\":{\"field\":\"bool\""));
assertThat(a, containsString("\"terms\":{\"field\":\"keyword\""));
assertThat(a, containsString("{\"avg\":{\"field\":\"int\"}"));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to get the list from the Function registry so that all existing aggs and future ones are picked up without having to manually update this test (since it is unlikely it will happen).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Yep, that sounds like a better solution (was thinking about possible going with reflection, but this is better).

public void testPivotHasSameQueryAsGroupBy() {
List<String> aggregations = asList("FIRST(int)", "LAST(int)", "COUNT(int)", "AVG(int)",
"MIN(int)", "MAX(int)", "SUM(int)", "PERCENTILE(int, 0)", "PERCENTILE_RANK(int, 0)",
"SUM_OF_SQUARES(int)", "STDDEV_POP(int)", "STDDEV_SAMP(int)", "VAR_SAMP(int)", "VAR_POP(int)",
"SKEWNESS(int)", "MAD(int)", "KURTOSIS(int)");
for (String aggregationStr : aggregations) {
PhysicalPlan pivotPlan = plan("SELECT * FROM (SELECT some.dotted.field, bool, keyword, int FROM test) " +
"PIVOT(" + aggregationStr + " FOR keyword IN ('A', 'B'))");
PhysicalPlan groupByPlan = plan("SELECT some.dotted.field, bool, keyword, " + aggregationStr + " " +
"FROM test WHERE keyword IN ('A', 'B') GROUP BY some.dotted.field, bool, keyword");
assertEquals(EsQueryExec.class, pivotPlan.getClass());
assertEquals(EsQueryExec.class, groupByPlan.getClass());
QueryContainer pivotQueryContainer = ((EsQueryExec) pivotPlan).queryContainer();
QueryContainer groupByQueryContainer = ((EsQueryExec) groupByPlan).queryContainer();
assertEquals(pivotQueryContainer.query(), groupByQueryContainer.query());
assertEquals(pivotQueryContainer.aggs(), groupByQueryContainer.aggs());
assertEquals(pivotPlan.toString(), groupByPlan.toString());
}
}

private static String randomOrderByAndLimit(int noOfSelectArgs) {
return SqlTestUtils.randomOrderByAndLimit(noOfSelectArgs, random());
Expand Down