diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 92cf9351dbc5..0a1d21252a50 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1396,9 +1396,6 @@ func (dsp *DistSQLPlanner) addAggregators( var finalAggsSpec execinfrapb.AggregatorSpec var finalAggsPost execinfrapb.PostProcessSpec - // planToStreamMapSet keeps track of whether or not - // p.PlanToStreamColMap has been set to its desired mapping or not. - planToStreamMapSet := false if !multiStage { finalAggsSpec = execinfrapb.AggregatorSpec{ Type: aggType, @@ -1694,16 +1691,9 @@ func (dsp *DistSQLPlanner) addAggregators( } finalAggsPost.RenderExprs = renderExprs } else if len(finalAggs) < len(aggregations) { - // We want to ensure we map the streams properly now - // that we've potential reduced the number of final - // aggregation output streams. We use finalIdxMap to - // create a 1-1 mapping from the final aggregators to - // their corresponding column index in the map. - p.PlanToStreamColMap = p.PlanToStreamColMap[:0] - for _, idx := range finalIdxMap { - p.PlanToStreamColMap = append(p.PlanToStreamColMap, int(idx)) - } - planToStreamMapSet = true + // We have removed some duplicates, so we need to add a projection. + finalAggsPost.Projection = true + finalAggsPost.OutputColumns = finalIdxMap } } @@ -1729,9 +1719,7 @@ func (dsp *DistSQLPlanner) addAggregators( // Update p.PlanToStreamColMap; we will have a simple 1-to-1 mapping of // planNode columns to stream columns because the aggregator // has been programmed to produce the same columns as the groupNode. - if !planToStreamMapSet { - p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations)) - } + p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations)) if len(finalAggsSpec.GroupCols) == 0 || len(p.ResultRouters) == 1 { // No GROUP BY, or we have a single stream. Use a single final aggregator. diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_agg b/pkg/sql/logictest/testdata/logic_test/distsql_agg index 65ceebb7f332..dfeefcbf2ec8 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_agg +++ b/pkg/sql/logictest/testdata/logic_test/distsql_agg @@ -613,3 +613,14 @@ query RI SELECT corr(DISTINCT y, x), count(y) FROM t55837 ---- 0.866025403784439 4 + +# Regression test for incorrectly populating the type schema produced by the +# final stage of aggregators (#58683). +statement ok +CREATE TABLE table58683_1 (col1 INT8 PRIMARY KEY); +INSERT INTO table58683_1 SELECT i FROM generate_series(1, 5) AS g(i); +ALTER TABLE table58683_1 SPLIT AT SELECT i FROM generate_series(1, 5) AS g(i); +ALTER TABLE table58683_1 EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) AS g(i); +CREATE TABLE table58683_2 (col2 BOOL); +ALTER TABLE table58683_2 EXPERIMENTAL_RELOCATE SELECT ARRAY[2], 2; +SELECT every(col2) FROM table58683_1 JOIN table58683_2 ON col1 = (table58683_2.rowid)::INT8 GROUP BY col2 HAVING bool_and(col2);