Skip to content

Commit

Permalink
sql: fix the dedup of aggregates during physical planning
Browse files Browse the repository at this point in the history
During the physical planning of the aggregates, we are performing
a de-duplication of aggregate functions in order to not have redundant
computations, and we handle the required "projection" via
a `PlanToStreamColMap` (this is a separate projection from the one in
the post-processing spec). As a result, a stage of aggregator
processors might produce less columns than we expected it to because
some duplicate functions were removed. Previously, this would result in
an incorrectly computed output schema of the aggregators.

The issue represented itself because we started relying on the recently
added `ResultTypes` field of a processor spec in the vectorized engine,
and I'm not sure whether it could result in an error in stable releases
(the field is also present on the physical plan).

The fix is rather simple - use the projection of the post-processing
spec, so I think it'll be worth backporting this even if we don't have
a repro of an issue on stable branches.

Release note: None
  • Loading branch information
yuzefovich committed Jan 14, 2021
1 parent 5197d58 commit eada8e9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
20 changes: 4 additions & 16 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,9 +1396,6 @@ func (dsp *DistSQLPlanner) addAggregators(
var finalAggsSpec execinfrapb.AggregatorSpec
var finalAggsPost execinfrapb.PostProcessSpec

// planToStreamMapSet keeps track of whether or not
// p.PlanToStreamColMap has been set to its desired mapping or not.
planToStreamMapSet := false
if !multiStage {
finalAggsSpec = execinfrapb.AggregatorSpec{
Type: aggType,
Expand Down Expand Up @@ -1694,16 +1691,9 @@ func (dsp *DistSQLPlanner) addAggregators(
}
finalAggsPost.RenderExprs = renderExprs
} else if len(finalAggs) < len(aggregations) {
// We want to ensure we map the streams properly now
// that we've potential reduced the number of final
// aggregation output streams. We use finalIdxMap to
// create a 1-1 mapping from the final aggregators to
// their corresponding column index in the map.
p.PlanToStreamColMap = p.PlanToStreamColMap[:0]
for _, idx := range finalIdxMap {
p.PlanToStreamColMap = append(p.PlanToStreamColMap, int(idx))
}
planToStreamMapSet = true
// We have removed some duplicates, so we need to add a projection.
finalAggsPost.Projection = true
finalAggsPost.OutputColumns = finalIdxMap
}
}

Expand All @@ -1729,9 +1719,7 @@ func (dsp *DistSQLPlanner) addAggregators(
// Update p.PlanToStreamColMap; we will have a simple 1-to-1 mapping of
// planNode columns to stream columns because the aggregator
// has been programmed to produce the same columns as the groupNode.
if !planToStreamMapSet {
p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations))
}
p.PlanToStreamColMap = identityMap(p.PlanToStreamColMap, len(aggregations))

if len(finalAggsSpec.GroupCols) == 0 || len(p.ResultRouters) == 1 {
// No GROUP BY, or we have a single stream. Use a single final aggregator.
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,14 @@ query RI
SELECT corr(DISTINCT y, x), count(y) FROM t55837
----
0.866025403784439 4

# Regression test for incorrectly populating the type schema produced by the
# final stage of aggregators (#58683).
statement ok
CREATE TABLE table58683_1 (col1 INT8 PRIMARY KEY);
INSERT INTO table58683_1 SELECT i FROM generate_series(1, 5) AS g(i);
ALTER TABLE table58683_1 SPLIT AT SELECT i FROM generate_series(1, 5) AS g(i);
ALTER TABLE table58683_1 EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) AS g(i);
CREATE TABLE table58683_2 (col2 BOOL);
ALTER TABLE table58683_2 EXPERIMENTAL_RELOCATE SELECT ARRAY[2], 2;
SELECT every(col2) FROM table58683_1 JOIN table58683_2 ON col1 = (table58683_2.rowid)::INT8 GROUP BY col2 HAVING bool_and(col2);

0 comments on commit eada8e9

Please sign in to comment.