Skip to content

Commit

Permalink
Correct Handling of UNION Queries with Literals in the Vitess Planner (
Browse files Browse the repository at this point in the history
…#16227)

Co-authored-by: Florent Poinsard <[email protected]>
  • Loading branch information
systay and frouioui authored Jun 20, 2024
1 parent c5c2e86 commit 7a737f4
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 41 deletions.
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func transformDistinct(ctx *plancontext.PlanningContext, op *operators.Distinct)
return &engine.Distinct{
Source: src,
CheckCols: op.Columns,
Truncate: op.Truncate,
Truncate: op.ResultColumns,
}, nil
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func transformFilter(ctx *plancontext.PlanningContext, op *operators.Filter) (en
Input: src,
Predicate: predicate,
ASTPredicate: ctx.SemTable.AndExpressions(op.Predicates...),
Truncate: op.Truncate,
Truncate: op.ResultColumns,
}, nil
}

Expand Down
40 changes: 34 additions & 6 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ type (
offsetPlanned bool

// Original will only be true for the original aggregator created from the AST
Original bool
Original bool

// ResultColumns signals how many columns will be produced by this operator
// This is used to truncate the columns in the final result
ResultColumns int

QP *QueryProjection
Expand Down Expand Up @@ -141,12 +144,24 @@ func (a *Aggregator) FindCol(ctx *plancontext.PlanningContext, in sqlparser.Expr

expr := a.DT.RewriteExpression(ctx, in)
if offset, found := canReuseColumn(ctx, a.Columns, expr, extractExpr); found {
a.checkOffset(offset)
return offset
}
return -1
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, ae *sqlparser.AliasedExpr) int {
func (a *Aggregator) checkOffset(offset int) {
// if the offset is greater than the number of columns we expect to produce, we need to update the number of columns
// this is to make sure that the column is not truncated in the final result
if a.ResultColumns > 0 && a.ResultColumns <= offset {
a.ResultColumns = offset + 1
}
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, ae *sqlparser.AliasedExpr) (offset int) {
defer func() {
a.checkOffset(offset)
}()
rewritten := a.DT.RewriteExpression(ctx, ae.Expr)

ae = &sqlparser.AliasedExpr{
Expand Down Expand Up @@ -180,7 +195,7 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
a.Aggregations = append(a.Aggregations, aggr)
}

offset := len(a.Columns)
offset = len(a.Columns)
a.Columns = append(a.Columns, ae)
incomingOffset := a.Source.AddColumn(ctx, false, groupBy, ae)

Expand Down Expand Up @@ -246,6 +261,7 @@ func (a *Aggregator) AddWSColumn(ctx *plancontext.PlanningContext, offset int, u
// TODO: we could handle this case by adding a projection on under the aggregator to make the columns line up
panic(errFailedToPlan(wsAe))
}
a.checkOffset(wsOffset)
return wsOffset
}

Expand Down Expand Up @@ -281,9 +297,9 @@ func isDerived(op Operator) bool {
}
}

func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) (res []*sqlparser.AliasedExpr) {
if isDerived(a.Source) {
return a.Columns
return truncate(a, a.Columns)
}

// we update the incoming columns, so we know about any new columns that have been added
Expand All @@ -296,7 +312,7 @@ func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.A
a.Columns = append(a.Columns, columns[len(a.Columns):]...)
}

return a.Columns
return truncate(a, a.Columns)
}

func (a *Aggregator) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
Expand All @@ -315,6 +331,9 @@ func (a *Aggregator) ShortDescription() string {
if a.Original {
org = "ORG "
}
if a.ResultColumns > 0 {
org += fmt.Sprintf(":%d ", a.ResultColumns)
}

if len(a.Grouping) == 0 {
return fmt.Sprintf("%s%s", org, strings.Join(columns, ", "))
Expand Down Expand Up @@ -511,7 +530,16 @@ func (a *Aggregator) setTruncateColumnCount(offset int) {
a.ResultColumns = offset
}

func (a *Aggregator) getTruncateColumnCount() int {
return a.ResultColumns
}

func (a *Aggregator) internalAddColumn(ctx *plancontext.PlanningContext, aliasedExpr *sqlparser.AliasedExpr, addToGroupBy bool) int {
if a.ResultColumns == 0 {
// if we need to use `internalAddColumn`, it means we are adding columns that are not part of the original list,
// so we need to set the ResultColumns to the current length of the columns list
a.ResultColumns = len(a.Columns)
}
offset := a.Source.AddColumn(ctx, true, addToGroupBy, aliasedExpr)

if offset == len(a.Columns) {
Expand Down
14 changes: 9 additions & 5 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type (
// This is only filled in during offset planning
Columns []engine.CheckCol

Truncate int
ResultColumns int
}
)

Expand Down Expand Up @@ -72,7 +72,7 @@ func (d *Distinct) Clone(inputs []Operator) Operator {
Columns: slices.Clone(d.Columns),
QP: d.QP,
PushedPerformance: d.PushedPerformance,
Truncate: d.Truncate,
ResultColumns: d.ResultColumns,
}
}

Expand Down Expand Up @@ -101,11 +101,11 @@ func (d *Distinct) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr
}

func (d *Distinct) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return d.Source.GetColumns(ctx)
return truncate(d, d.Source.GetColumns(ctx))
}

func (d *Distinct) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return d.Source.GetSelectExprs(ctx)
return truncate(d, d.Source.GetSelectExprs(ctx))
}

func (d *Distinct) ShortDescription() string {
Expand All @@ -120,5 +120,9 @@ func (d *Distinct) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
}

func (d *Distinct) setTruncateColumnCount(offset int) {
d.Truncate = offset
d.ResultColumns = offset
}

func (d *Distinct) getTruncateColumnCount() int {
return d.ResultColumns
}
14 changes: 9 additions & 5 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Filter struct {
// It contains the ANDed predicates in Predicates, with ColName:s replaced by Offset:s
PredicateWithOffsets evalengine.Expr

Truncate int
ResultColumns int
}

func newFilterSinglePredicate(op Operator, expr sqlparser.Expr) Operator {
Expand All @@ -55,7 +55,7 @@ func (f *Filter) Clone(inputs []Operator) Operator {
Source: inputs[0],
Predicates: slices.Clone(f.Predicates),
PredicateWithOffsets: f.PredicateWithOffsets,
Truncate: f.Truncate,
ResultColumns: f.ResultColumns,
}
}

Expand Down Expand Up @@ -100,11 +100,11 @@ func (f *Filter) AddWSColumn(ctx *plancontext.PlanningContext, offset int, under
}

func (f *Filter) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return f.Source.GetColumns(ctx)
return truncate(f, f.Source.GetColumns(ctx))
}

func (f *Filter) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return f.Source.GetSelectExprs(ctx)
return truncate(f, f.Source.GetSelectExprs(ctx))
}

func (f *Filter) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -151,5 +151,9 @@ func (f *Filter) ShortDescription() string {
}

func (f *Filter) setTruncateColumnCount(offset int) {
f.Truncate = offset
f.ResultColumns = offset
}

func (f *Filter) getTruncateColumnCount() int {
return f.ResultColumns
}
8 changes: 6 additions & 2 deletions go/vt/vtgate/planbuilder/operators/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func (o *Ordering) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr
}

func (o *Ordering) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return o.Source.GetColumns(ctx)
return truncate(o, o.Source.GetColumns(ctx))
}

func (o *Ordering) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return o.Source.GetSelectExprs(ctx)
return truncate(o, o.Source.GetSelectExprs(ctx))
}

func (o *Ordering) GetOrdering(*plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -108,3 +108,7 @@ func (o *Ordering) ShortDescription() string {
func (o *Ordering) setTruncateColumnCount(offset int) {
o.ResultColumns = offset
}

func (o *Ordering) getTruncateColumnCount() int {
return o.ResultColumns
}
24 changes: 20 additions & 4 deletions go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,21 @@ func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) O
panic(vterrors.VT13001("the noColumns operator cannot accept predicates"))
}

// tryTruncateColumnsAt will see if we can truncate the columns by just asking the operator to do it for us
func tryTruncateColumnsAt(op Operator, truncateAt int) bool {
type columnTruncator interface {
setTruncateColumnCount(offset int)
// columnTruncator is an interface that allows an operator to truncate its columns to a certain length
type columnTruncator interface {
setTruncateColumnCount(offset int)
getTruncateColumnCount() int
}

func truncate[K any](op columnTruncator, slice []K) []K {
if op.getTruncateColumnCount() == 0 {
return slice
}
return slice[:op.getTruncateColumnCount()]
}

// tryTruncateColumnsAt will see if we can truncate the columns by just asking the operator to do it for us
func tryTruncateColumnsAt(op Operator, truncateAt int) bool {
truncator, ok := op.(columnTruncator)
if ok {
truncator.setTruncateColumnCount(truncateAt)
Expand All @@ -160,6 +169,13 @@ func tryTruncateColumnsAt(op Operator, truncateAt int) bool {

func transformColumnsToSelectExprs(ctx *plancontext.PlanningContext, op Operator) sqlparser.SelectExprs {
columns := op.GetColumns(ctx)
if trunc, ok := op.(columnTruncator); ok {
count := trunc.getTruncateColumnCount()
if count > 0 {
columns = columns[:count]
}
}

selExprs := slice.Map(columns, func(from *sqlparser.AliasedExpr) sqlparser.SelectExpr {
return from
})
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,11 @@ func (r *Route) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, _
}

func (r *Route) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return r.Source.GetColumns(ctx)
return truncate(r, r.Source.GetColumns(ctx))
}

func (r *Route) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return r.Source.GetSelectExprs(ctx)
return truncate(r, r.Source.GetSelectExprs(ctx))
}

func (r *Route) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
Expand Down Expand Up @@ -849,6 +849,10 @@ func (r *Route) setTruncateColumnCount(offset int) {
r.ResultColumns = offset
}

func (r *Route) getTruncateColumnCount() int {
return r.ResultColumns
}

func (r *Route) introducesTableID() semantics.TableSet {
id := semantics.EmptyTableSet()
for _, route := range r.MergedWith {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/operators/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (u *Union) addConstantToUnion(ctx *plancontext.PlanningContext, aexpr *sqlp
outputOffset = thisOffset
} else {
if thisOffset != outputOffset {
panic(vterrors.VT12001("argument offsets did not line up for UNION"))
tree := ToTree(u)
panic(vterrors.VT12001(fmt.Sprintf("argument offsets did not line up for UNION. Pushing %s - want %d got %d\n%s", sqlparser.String(aexpr), outputOffset, thisOffset, tree)))
}
}
}
Expand All @@ -224,7 +225,7 @@ func (u *Union) addWeightStringToOffset(ctx *plancontext.PlanningContext, argIdx
outputOffset = thisOffset
} else {
if thisOffset != outputOffset {
panic(vterrors.VT12001("weight_string offsets did not line up for UNION"))
panic(vterrors.VT13001("weight_string offsets did not line up for UNION"))
}
}
}
Expand Down
Loading

0 comments on commit 7a737f4

Please sign in to comment.