Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct Handling of UNION Queries with Literals in the Vitess Planner #16227

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func transformDistinct(ctx *plancontext.PlanningContext, op *operators.Distinct)
return &engine.Distinct{
Source: src,
CheckCols: op.Columns,
Truncate: op.Truncate,
Truncate: op.ResultColumns,
}, nil
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func transformFilter(ctx *plancontext.PlanningContext, op *operators.Filter) (en
Input: src,
Predicate: predicate,
ASTPredicate: ctx.SemTable.AndExpressions(op.Predicates...),
Truncate: op.Truncate,
Truncate: op.ResultColumns,
}, nil
}

Expand Down
40 changes: 34 additions & 6 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ type (
offsetPlanned bool

// Original will only be true for the original aggregator created from the AST
Original bool
Original bool

// ResultColumns signals how many columns will be produced by this operator
// This is used to truncate the columns in the final result
ResultColumns int

QP *QueryProjection
Expand Down Expand Up @@ -141,12 +144,24 @@ func (a *Aggregator) FindCol(ctx *plancontext.PlanningContext, in sqlparser.Expr

expr := a.DT.RewriteExpression(ctx, in)
if offset, found := canReuseColumn(ctx, a.Columns, expr, extractExpr); found {
a.checkOffset(offset)
return offset
}
return -1
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, ae *sqlparser.AliasedExpr) int {
func (a *Aggregator) checkOffset(offset int) {
// if the offset is greater than the number of columns we expect to produce, we need to update the number of columns
// this is to make sure that the column is not truncated in the final result
if a.ResultColumns > 0 && a.ResultColumns <= offset {
a.ResultColumns = offset + 1
}
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, ae *sqlparser.AliasedExpr) (offset int) {
defer func() {
a.checkOffset(offset)
}()
rewritten := a.DT.RewriteExpression(ctx, ae.Expr)

ae = &sqlparser.AliasedExpr{
Expand Down Expand Up @@ -180,7 +195,7 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
a.Aggregations = append(a.Aggregations, aggr)
}

offset := len(a.Columns)
offset = len(a.Columns)
a.Columns = append(a.Columns, ae)
incomingOffset := a.Source.AddColumn(ctx, false, groupBy, ae)

Expand Down Expand Up @@ -246,6 +261,7 @@ func (a *Aggregator) AddWSColumn(ctx *plancontext.PlanningContext, offset int, u
// TODO: we could handle this case by adding a projection on under the aggregator to make the columns line up
panic(errFailedToPlan(wsAe))
}
a.checkOffset(wsOffset)
return wsOffset
}

Expand Down Expand Up @@ -281,9 +297,9 @@ func isDerived(op Operator) bool {
}
}

func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) (res []*sqlparser.AliasedExpr) {
if isDerived(a.Source) {
return a.Columns
return truncate(a, a.Columns)
}

// we update the incoming columns, so we know about any new columns that have been added
Expand All @@ -296,7 +312,7 @@ func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.A
a.Columns = append(a.Columns, columns[len(a.Columns):]...)
}

return a.Columns
return truncate(a, a.Columns)
}

func (a *Aggregator) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
Expand All @@ -315,6 +331,9 @@ func (a *Aggregator) ShortDescription() string {
if a.Original {
org = "ORG "
}
if a.ResultColumns > 0 {
org += fmt.Sprintf(":%d ", a.ResultColumns)
}

if len(a.Grouping) == 0 {
return fmt.Sprintf("%s%s", org, strings.Join(columns, ", "))
Expand Down Expand Up @@ -511,7 +530,16 @@ func (a *Aggregator) setTruncateColumnCount(offset int) {
a.ResultColumns = offset
}

func (a *Aggregator) getTruncateColumnCount() int {
return a.ResultColumns
}

func (a *Aggregator) internalAddColumn(ctx *plancontext.PlanningContext, aliasedExpr *sqlparser.AliasedExpr, addToGroupBy bool) int {
if a.ResultColumns == 0 {
// if we need to use `internalAddColumn`, it means we are adding columns that are not part of the original list,
// so we need to set the ResultColumns to the current length of the columns list
a.ResultColumns = len(a.Columns)
}
offset := a.Source.AddColumn(ctx, true, addToGroupBy, aliasedExpr)

if offset == len(a.Columns) {
Expand Down
14 changes: 9 additions & 5 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type (
// This is only filled in during offset planning
Columns []engine.CheckCol

Truncate int
ResultColumns int
}
)

Expand Down Expand Up @@ -72,7 +72,7 @@ func (d *Distinct) Clone(inputs []Operator) Operator {
Columns: slices.Clone(d.Columns),
QP: d.QP,
PushedPerformance: d.PushedPerformance,
Truncate: d.Truncate,
ResultColumns: d.ResultColumns,
}
}

Expand Down Expand Up @@ -101,11 +101,11 @@ func (d *Distinct) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr
}

func (d *Distinct) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return d.Source.GetColumns(ctx)
return truncate(d, d.Source.GetColumns(ctx))
}

func (d *Distinct) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return d.Source.GetSelectExprs(ctx)
return truncate(d, d.Source.GetSelectExprs(ctx))
}

func (d *Distinct) ShortDescription() string {
Expand All @@ -120,5 +120,9 @@ func (d *Distinct) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
}

func (d *Distinct) setTruncateColumnCount(offset int) {
d.Truncate = offset
d.ResultColumns = offset
}

func (d *Distinct) getTruncateColumnCount() int {
return d.ResultColumns
}
14 changes: 9 additions & 5 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Filter struct {
// It contains the ANDed predicates in Predicates, with ColName:s replaced by Offset:s
PredicateWithOffsets evalengine.Expr

Truncate int
ResultColumns int
}

func newFilterSinglePredicate(op Operator, expr sqlparser.Expr) Operator {
Expand All @@ -55,7 +55,7 @@ func (f *Filter) Clone(inputs []Operator) Operator {
Source: inputs[0],
Predicates: slices.Clone(f.Predicates),
PredicateWithOffsets: f.PredicateWithOffsets,
Truncate: f.Truncate,
ResultColumns: f.ResultColumns,
}
}

Expand Down Expand Up @@ -100,11 +100,11 @@ func (f *Filter) AddWSColumn(ctx *plancontext.PlanningContext, offset int, under
}

func (f *Filter) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return f.Source.GetColumns(ctx)
return truncate(f, f.Source.GetColumns(ctx))
}

func (f *Filter) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return f.Source.GetSelectExprs(ctx)
return truncate(f, f.Source.GetSelectExprs(ctx))
}

func (f *Filter) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -151,5 +151,9 @@ func (f *Filter) ShortDescription() string {
}

func (f *Filter) setTruncateColumnCount(offset int) {
f.Truncate = offset
f.ResultColumns = offset
}

func (f *Filter) getTruncateColumnCount() int {
return f.ResultColumns
}
8 changes: 6 additions & 2 deletions go/vt/vtgate/planbuilder/operators/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func (o *Ordering) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr
}

func (o *Ordering) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return o.Source.GetColumns(ctx)
return truncate(o, o.Source.GetColumns(ctx))
}

func (o *Ordering) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return o.Source.GetSelectExprs(ctx)
return truncate(o, o.Source.GetSelectExprs(ctx))
}

func (o *Ordering) GetOrdering(*plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -108,3 +108,7 @@ func (o *Ordering) ShortDescription() string {
func (o *Ordering) setTruncateColumnCount(offset int) {
o.ResultColumns = offset
}

func (o *Ordering) getTruncateColumnCount() int {
return o.ResultColumns
}
24 changes: 20 additions & 4 deletions go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,21 @@ func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) O
panic(vterrors.VT13001("the noColumns operator cannot accept predicates"))
}

// tryTruncateColumnsAt will see if we can truncate the columns by just asking the operator to do it for us
func tryTruncateColumnsAt(op Operator, truncateAt int) bool {
type columnTruncator interface {
setTruncateColumnCount(offset int)
// columnTruncator is an interface that allows an operator to truncate its columns to a certain length
type columnTruncator interface {
setTruncateColumnCount(offset int)
getTruncateColumnCount() int
}

func truncate[K any](op columnTruncator, slice []K) []K {
if op.getTruncateColumnCount() == 0 {
return slice
}
return slice[:op.getTruncateColumnCount()]
}

// tryTruncateColumnsAt will see if we can truncate the columns by just asking the operator to do it for us
func tryTruncateColumnsAt(op Operator, truncateAt int) bool {
truncator, ok := op.(columnTruncator)
if ok {
truncator.setTruncateColumnCount(truncateAt)
Expand All @@ -160,6 +169,13 @@ func tryTruncateColumnsAt(op Operator, truncateAt int) bool {

func transformColumnsToSelectExprs(ctx *plancontext.PlanningContext, op Operator) sqlparser.SelectExprs {
columns := op.GetColumns(ctx)
if trunc, ok := op.(columnTruncator); ok {
count := trunc.getTruncateColumnCount()
if count > 0 {
columns = columns[:count]
}
}

selExprs := slice.Map(columns, func(from *sqlparser.AliasedExpr) sqlparser.SelectExpr {
return from
})
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,11 @@ func (r *Route) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, _
}

func (r *Route) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return r.Source.GetColumns(ctx)
return truncate(r, r.Source.GetColumns(ctx))
}

func (r *Route) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return r.Source.GetSelectExprs(ctx)
return truncate(r, r.Source.GetSelectExprs(ctx))
}

func (r *Route) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -849,6 +849,10 @@ func (r *Route) setTruncateColumnCount(offset int) {
r.ResultColumns = offset
}

func (r *Route) getTruncateColumnCount() int {
return r.ResultColumns
}

func (r *Route) introducesTableID() semantics.TableSet {
id := semantics.EmptyTableSet()
for _, route := range r.MergedWith {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/operators/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (u *Union) addConstantToUnion(ctx *plancontext.PlanningContext, aexpr *sqlp
outputOffset = thisOffset
} else {
if thisOffset != outputOffset {
panic(vterrors.VT12001("argument offsets did not line up for UNION"))
tree := ToTree(u)
panic(vterrors.VT12001(fmt.Sprintf("argument offsets did not line up for UNION. Pushing %s - want %d got %d\n%s", sqlparser.String(aexpr), outputOffset, thisOffset, tree)))
}
}
}
Expand All @@ -224,7 +225,7 @@ func (u *Union) addWeightStringToOffset(ctx *plancontext.PlanningContext, argIdx
outputOffset = thisOffset
} else {
if thisOffset != outputOffset {
panic(vterrors.VT12001("weight_string offsets did not line up for UNION"))
panic(vterrors.VT13001("weight_string offsets did not line up for UNION"))
}
}
}
Expand Down
Loading
Loading