From eada8e9a37e5277da67d9f61741dfb1d27d964a1 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 12 Jan 2021 18:57:57 -0800 Subject: [PATCH] sql: fix the dedup of aggregates during physical planning During the physical planning of the aggregates, we are performing a de-duplication of aggregate functions in order to not have redundant computations, and we handle the required "projection" via a `PlanToStreamColMap` (this is a separate projection from the one in the post-processing spec). As a result, a stage of aggregator processors might produce less columns than we expected it to because some duplicate functions were removed. Previously, this would result in an incorrectly computed output schema of the aggregators. The issue represented itself because we started relying on the recently added `ResultTypes` field of a processor spec in the vectorized engine, and I'm not sure whether it could result in an error in stable releases (the field is also present on the physical plan). The fix is rather simple - use the projection of the post-processing spec, so I think it'll be worth backporting this even if we don't have a repro of an issue on stable branches. Release note: None --- pkg/sql/distsql_physical_planner.go | 20 ++++--------------- .../logictest/testdata/logic_test/distsql_agg | 11 ++++++++++ 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 92cf9351dbc5..0a1d21252a50 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1396,9 +1396,6 @@ func (dsp *DistSQLPlanner) addAggregators( var finalAggsSpec execinfrapb.AggregatorSpec var finalAggsPost execinfrapb.PostProcessSpec - // planToStreamMapSet keeps track of whether or not - // p.PlanToStreamColMap has been set to its desired mapping or not. - planToStreamMapSet := false if !multiStage { finalAggsSpec = execinfrapb.AggregatorSpec{ Type: aggType, @@ -1694,16 +1691,9 @@ func (dsp *DistSQLPlanner) addAggregators( } finalAggsPost.RenderExprs = renderExprs } else if len(finalAggs) < len(aggregations) { - // We want to ensure we map the streams properly now - // that we've potential reduced the number of final - // aggregation output streams. We use finalIdxMap to - // create a 1-1 mapping from the final aggregators to - // their corresponding column index in the map. - p.PlanToStreamColMap = p.PlanToStreamColMap[:0] - for _, idx := range finalIdxMap { - p.PlanToStreamColMap = append(p.PlanToStreamColMap, int(idx)) - } - planToStreamMapSet = true + // We have removed some duplicates, so we need to add a projection. + finalAggsPost.Projection = true + finalAggsPost.OutputColumns = finalIdxMap } } @@ -1729,9 +1719,7 @@ func (dsp *DistSQLPlanner) addAggregators( // Update p.PlanToStreamColMap; we will have a simple 1-to-1 mapping of // planNode columns to stream columns because the aggregator // has been programmed to produce the same columns as the groupNode. - if !planToStreamMapSet { - p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations)) - } + p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations)) if len(finalAggsSpec.GroupCols) == 0 || len(p.ResultRouters) == 1 { // No GROUP BY, or we have a single stream. Use a single final aggregator. diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_agg b/pkg/sql/logictest/testdata/logic_test/distsql_agg index 65ceebb7f332..dfeefcbf2ec8 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_agg +++ b/pkg/sql/logictest/testdata/logic_test/distsql_agg @@ -613,3 +613,14 @@ query RI SELECT corr(DISTINCT y, x), count(y) FROM t55837 ---- 0.866025403784439 4 + +# Regression test for incorrectly populating the type schema produced by the +# final stage of aggregators (#58683). +statement ok +CREATE TABLE table58683_1 (col1 INT8 PRIMARY KEY); +INSERT INTO table58683_1 SELECT i FROM generate_series(1, 5) AS g(i); +ALTER TABLE table58683_1 SPLIT AT SELECT i FROM generate_series(1, 5) AS g(i); +ALTER TABLE table58683_1 EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) AS g(i); +CREATE TABLE table58683_2 (col2 BOOL); +ALTER TABLE table58683_2 EXPERIMENTAL_RELOCATE SELECT ARRAY[2], 2; +SELECT every(col2) FROM table58683_1 JOIN table58683_2 ON col1 = (table58683_2.rowid)::INT8 GROUP BY col2 HAVING bool_and(col2);