diff --git a/go/test/endtoend/vtgate/vitess_tester/aggregation/aggregation.test b/go/test/endtoend/vtgate/vitess_tester/aggregation/aggregation.test index 8861b9672b8..3f89d867ff8 100644 --- a/go/test/endtoend/vtgate/vitess_tester/aggregation/aggregation.test +++ b/go/test/endtoend/vtgate/vitess_tester/aggregation/aggregation.test @@ -9,7 +9,7 @@ CREATE TABLE `t1` CREATE TABLE `t2` ( - `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `id` bigint unsigned NOT NULL AUTO_INCREMENT, `t1_id` int unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE InnoDB, @@ -38,7 +38,10 @@ values (1, 1), insert into t3 (id, name) values (1, 'A'), - (2, 'B'); + (2, 'B'), + (3, 'B'), + (4, 'B'), + (5, 'B'); -- wait_authoritative t1 -- wait_authoritative t2 @@ -47,4 +50,11 @@ select group_concat(t3.name SEPARATOR ', ') as "Group Name" from t1 join t2 on t1.id = t2.t1_id left join t3 on t1.id = t3.id -group by t1.id; \ No newline at end of file +group by t1.id; + +select COUNT(*) +from (select 1 as one + FROM `t3` + WHERE `t3`.`name` = 'B' + ORDER BY id DESC LIMIT 25 + OFFSET 0) subquery_for_count; \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index fd9fca30110..36cb0b1a771 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -115,14 +115,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return offset } -func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) { - for i, ae := range exprs { - offset := a.addColumnWithoutPushing(ctx, ae, groupby[i]) - offsets = append(offsets, offset) - } - return -} - func (a *Aggregator) isDerived() bool { return a.DT != nil } diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index 3f7700eed9d..3b934752a00 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio } func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) { - op := createProjectionFromSelect(ctx, horizon) qp := horizon.getQP(ctx) var extracted []string + + if horizon.IsDerived() { + // if we are dealing with a derived table, we need to make sure that the ordering columns + // are available outside the derived table + for _, order := range horizon.Query.GetOrderBy() { + qp.addColumn(ctx, order.Expr) + } + } + + op := createProjectionFromSelect(ctx, horizon) if qp.HasAggr { extracted = append(extracted, "Aggregation") } else { diff --git a/go/vt/vtgate/planbuilder/operators/projection.go b/go/vt/vtgate/planbuilder/operators/projection.go index 527991cba26..d8ede63b612 100644 --- a/go/vt/vtgate/planbuilder/operators/projection.go +++ b/go/vt/vtgate/planbuilder/operators/projection.go @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return p.addColumn(ctx, true, false, expr, false) } -func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int { - offsets := make([]int, len(exprs)) - for idx, expr := range exprs { - offset := p.addColumn(ctx, reuse, false, expr, false) - offsets[idx] = offset - } - return offsets -} - func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int { cols, aliased := p.Columns.(AliasedProjections) if !aliased { diff --git a/go/vt/vtgate/planbuilder/operators/queryprojection.go b/go/vt/vtgate/planbuilder/operators/queryprojection.go index 8ad8a6efe1e..c747870f5d2 100644 --- a/go/vt/vtgate/planbuilder/operators/queryprojection.go +++ b/go/vt/vtgate/planbuilder/operators/queryprojection.go @@ -664,6 +664,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont return true } +// addColumn adds a column to the QueryProjection if it is not already present +func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { + for _, selectExpr := range qp.SelectExprs { + getExpr, err := selectExpr.GetExpr() + if err != nil { + continue + } + if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) { + return + } + } + qp.SelectExprs = append(qp.SelectExprs, SelectExpr{ + Col: aeWrap(expr), + Aggr: ctx.ContainsAggr(expr), + }) +} + func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { _ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) { if ctx.IsAggr(node) { diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index 62d6aad6a97..1c91077c2e4 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, // if at least one column is not already present, we check if we can easily find a projection // or aggregation in our source that we can add to - derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - r.Source = op - if ok { - return offsets[0] + derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb) + if op != nil { + r.Source = op + } + if offset >= 0 { + return offset } // If no-one could be found, we probably don't have one yet, so we add one here src := createProjection(ctx, r.Source, derived) r.Source = src - offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - return offsets[0] + return src.addColumnWithoutPushing(ctx, expr, gb) } type selectExpressions interface { Operator addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int - addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int derivedName() string } // addColumnToInput adds columns to an operator without pushing them down -func addMultipleColumnsToInput( +func addColumnToInput( ctx *plancontext.PlanningContext, operator Operator, - reuse bool, - addToGroupBy []bool, - exprs []*sqlparser.AliasedExpr, -) (derivedName string, // if we found a derived table, this will contain its name + expr *sqlparser.AliasedExpr, + reuse, addToGroupBy bool, +) ( + derivedName string, // if we found a derived table, this will contain its name projection Operator, // if an operator needed to be built, it will be returned here - found bool, // whether a matching op was found or not - offsets []int, // the offsets the expressions received + offset int, // the offset of the expression, -1 if not found ) { + var src Operator + var updateSrc func(Operator) switch op := operator.(type) { - case *SubQuery: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs) - if added { - op.Outer = src - } - return derivedName, op, added, offset + // Pass through operators - we can just add the columns to their source + case *SubQuery: + src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc } case *Distinct: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Limit: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Ordering: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *LockAndComment: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } + + // Union needs special handling, we can't really add new columns to all inputs + case *Union: + proj := wrapInDerivedProjection(ctx, op) + dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy) + if newOp == nil { + newOp = proj } - return derivedName, op, added, offset + return dtName, newOp, offset + // Horizon is another one of these - we can't really add new columns to it case *Horizon: - // if the horizon has an alias, then it is a derived table, - // we have to add a new projection and can't build on this one - return op.Alias, op, false, nil + return op.Alias, nil, -1 case selectExpressions: name := op.derivedName() if name != "" { // if the only thing we can push to is a derived table, // we have to add a new projection and can't build on this one - return name, op, false, nil + return name, nil, -1 } - offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs) - return "", op, true, offset + offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy) + return "", nil, offset - case *Union: - proj := addDerivedProj(ctx, op) - return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs) default: - return "", op, false, nil + return "", nil, -1 + } + + // Handle the case where we have a pass-through operator + derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy) + if src != nil { + updateSrc(src) } + return derivedName, nil, offset } func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int { @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset) if !ok { - src := addDerivedProj(ctx, r.Source) + src := wrapInDerivedProjection(ctx, r.Source) r.Source = src return src.AddWSColumn(ctx, offset, true) } @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse return false, -1 } -func addDerivedProj( +// wrapInDerivedProjection wraps the input in a derived table projection named "dt" +func wrapInDerivedProjection( ctx *plancontext.PlanningContext, op Operator, ) (projection *Projection) { diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index b124e8f2b50..a2155ee0700 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -7144,6 +7144,49 @@ ] } }, + { + "comment": "Aggregation over a ORDER BY/LIMIT inside a derived table", + "query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "plan": { + "QueryType": "SELECT", + "Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS count(*)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": "2", + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "25", + "Offset": "0", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1", + "OrderBy": "(1|3) DESC", + "Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25", + "Table": "`user`" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, { "comment": "should be able to push down aggregation", "query": "select sum(user.type) from user join user_extra on user.team_id = user_extra.id group by user_extra.id order by user_extra.id",