Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: Handle ORDER BY inside derived tables #16353

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE TABLE `t1`

CREATE TABLE `t2`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`t1_id` int unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
Expand Down Expand Up @@ -38,7 +38,10 @@ values (1, 1),

insert into t3 (id, name)
values (1, 'A'),
(2, 'B');
(2, 'B'),
(3, 'B'),
(4, 'B'),
(5, 'B');

-- wait_authoritative t1
-- wait_authoritative t2
Expand All @@ -47,4 +50,11 @@ select group_concat(t3.name SEPARATOR ', ') as "Group Name"
from t1
join t2 on t1.id = t2.t1_id
left join t3 on t1.id = t3.id
group by t1.id;
group by t1.id;

select COUNT(*)
from (select 1 as one
FROM `t3`
WHERE `t3`.`name` = 'B'
ORDER BY id DESC LIMIT 25
OFFSET 0) subquery_for_count;
8 changes: 0 additions & 8 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return offset
}

func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) {
for i, ae := range exprs {
offset := a.addColumnWithoutPushing(ctx, ae, groupby[i])
offsets = append(offsets, offset)
}
return
}

func (a *Aggregator) isDerived() bool {
return a.DT != nil
}
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio
}

func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) {
op := createProjectionFromSelect(ctx, horizon)
qp := horizon.getQP(ctx)
var extracted []string

if horizon.IsDerived() {
// if we are dealing with a derived table, we need to make sure that the ordering columns
// are available outside the derived table
for _, order := range horizon.Query.GetOrderBy() {
qp.addColumn(ctx, order.Expr)
}
}

op := createProjectionFromSelect(ctx, horizon)
if qp.HasAggr {
extracted = append(extracted, "Aggregation")
} else {
Expand Down
9 changes: 0 additions & 9 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return p.addColumn(ctx, true, false, expr, false)
}

func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int {
offsets := make([]int, len(exprs))
for idx, expr := range exprs {
offset := p.addColumn(ctx, reuse, false, expr, false)
offsets[idx] = offset
}
return offsets
}

func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
cols, aliased := p.Columns.(AliasedProjections)
if !aliased {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vtgate/planbuilder/operators/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont
return true
}

// addColumn adds a column to the QueryProjection if it is not already present
func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
for _, selectExpr := range qp.SelectExprs {
getExpr, err := selectExpr.GetExpr()
if err != nil {
continue
}
if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) {
return
}
}
qp.SelectExprs = append(qp.SelectExprs, SelectExpr{
Col: aeWrap(expr),
Aggr: ctx.ContainsAggr(expr),
})
}

func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
if ctx.IsAggr(node) {
Expand Down
100 changes: 46 additions & 54 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool,

// if at least one column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
r.Source = op
if ok {
return offsets[0]
derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb)
if op != nil {
r.Source = op
}
if offset >= 0 {
return offset
}

// If no-one could be found, we probably don't have one yet, so we add one here
src := createProjection(ctx, r.Source, derived)
r.Source = src

offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
return offsets[0]
return src.addColumnWithoutPushing(ctx, expr, gb)
}

type selectExpressions interface {
Operator
addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int
addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int
derivedName() string
}

// addColumnToInput adds columns to an operator without pushing them down
func addMultipleColumnsToInput(
func addColumnToInput(
ctx *plancontext.PlanningContext,
operator Operator,
reuse bool,
addToGroupBy []bool,
exprs []*sqlparser.AliasedExpr,
) (derivedName string, // if we found a derived table, this will contain its name
expr *sqlparser.AliasedExpr,
reuse, addToGroupBy bool,
) (
derivedName string, // if we found a derived table, this will contain its name
projection Operator, // if an operator needed to be built, it will be returned here
found bool, // whether a matching op was found or not
offsets []int, // the offsets the expressions received
offset int, // the offset of the expression, -1 if not found
) {
var src Operator
var updateSrc func(Operator)
switch op := operator.(type) {
case *SubQuery:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs)
if added {
op.Outer = src
}
return derivedName, op, added, offset

// Pass through operators - we can just add the columns to their source
case *SubQuery:
src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc }
case *Distinct:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Limit:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Ordering:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *LockAndComment:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }

// Union needs special handling, we can't really add new columns to all inputs
case *Union:
proj := wrapInDerivedProjection(ctx, op)
dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy)
if newOp == nil {
newOp = proj
}
return derivedName, op, added, offset
return dtName, newOp, offset

// Horizon is another one of these - we can't really add new columns to it
case *Horizon:
// if the horizon has an alias, then it is a derived table,
// we have to add a new projection and can't build on this one
return op.Alias, op, false, nil
return op.Alias, nil, -1

case selectExpressions:
name := op.derivedName()
if name != "" {
// if the only thing we can push to is a derived table,
// we have to add a new projection and can't build on this one
return name, op, false, nil
return name, nil, -1
}
offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs)
return "", op, true, offset
offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy)
return "", nil, offset

case *Union:
proj := addDerivedProj(ctx, op)
return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs)
default:
return "", op, false, nil
return "", nil, -1
}

// Handle the case where we have a pass-through operator
derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy)
if src != nil {
updateSrc(src)
}
return derivedName, nil, offset
}

func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int {
Expand All @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool

ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset)
if !ok {
src := addDerivedProj(ctx, r.Source)
src := wrapInDerivedProjection(ctx, r.Source)
r.Source = src
return src.AddWSColumn(ctx, offset, true)
}
Expand All @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse
return false, -1
}

func addDerivedProj(
// wrapInDerivedProjection wraps the input in a derived table projection named "dt"
func wrapInDerivedProjection(
ctx *plancontext.PlanningContext,
op Operator,
) (projection *Projection) {
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -7144,6 +7144,49 @@
]
}
},
{
"comment": "Aggregation over a ORDER BY/LIMIT inside a derived table",
"query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count_star(0) AS count(*)",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "2",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "25",
"Offset": "0",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1",
"OrderBy": "(1|3) DESC",
"Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25",
"Table": "`user`"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
},
{
"comment": "should be able to push down aggregation",
"query": "select sum(user.type) from user join user_extra on user.team_id = user_extra.id group by user_extra.id order by user_extra.id",
Expand Down
Loading