Skip to content

Commit

Permalink
sql: refactor tree.RoutineExpr
Browse files Browse the repository at this point in the history
`tree.RoutineExpr` no longer tracks the number of statements in the
routine. Instead, it has a `tree.RoutinePlanGenerator` that generates a
plan for each statement in the routine, given a list of arguments, and
invokes a given callback on the plan.

Release note: None
  • Loading branch information
mgartner committed Jan 26, 2023
1 parent 831dbcc commit 08ac8fd
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 153 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/udf
Original file line number Diff line number Diff line change
Expand Up @@ -2157,15 +2157,15 @@ SELECT trace_fn(a) FROM trace_tab
statement ok
SET tracing = off

query T
query T rowsort
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message ~ 'udf'
----
=== SPAN START: udf-stmt-trace_fn-0 ===
=== SPAN START: udf-stmt-trace_fn-1 ===
=== SPAN START: udf-stmt-trace_fn-0 ===
=== SPAN START: udf-stmt-trace_fn-2 ===
=== SPAN START: udf-stmt-trace_fn-1 ===
=== SPAN START: udf-stmt-trace_fn-0 ===
=== SPAN START: udf-stmt-trace_fn-2 ===
=== SPAN START: udf-stmt-trace_fn-1 ===
=== SPAN START: udf-stmt-trace_fn-2 ===


subtest args
Expand Down
204 changes: 108 additions & 96 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,12 +651,11 @@ func (b *Builder) buildSubquery(

// Create a tree.RoutinePlanFn that can plan the single statement
// representing the subquery.
planFn := b.buildRoutinePlanFn(params, stmts, true /* allowOuterWithRefs */)
planGen := b.buildRoutinePlanGenerator(params, stmts, true /* allowOuterWithRefs */)
return tree.NewTypedRoutineExpr(
"subquery",
args,
planFn,
1, /* numStmts */
planGen,
subquery.Typ,
false, /* enableStepping */
), nil
Expand All @@ -675,36 +674,43 @@ func (b *Builder) buildSubquery(
inputRowCount := int64(input.Relational().Statistics().RowCountIfAvailable())
withExprs := make([]builtWithExpr, len(b.withExprs))
copy(withExprs, b.withExprs)
planFn := func(
ctx context.Context, ref tree.RoutineExecFactory, stmtIdx int, args tree.Datums,
) (tree.RoutinePlan, error) {
planGen := func(
ctx context.Context, ref tree.RoutineExecFactory, args tree.Datums, fn tree.RoutinePlanGeneratedFunc,
) error {
ef := ref.(exec.Factory)
eb := New(ctx, ef, b.optimizer, b.mem, b.catalog, input, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb.withExprs = withExprs
eb.disableTelemetry = true
eb.planLazySubqueries = true
plan, err := eb.buildRelational(input)
ePlan, err := eb.buildRelational(input)
if err != nil {
return nil, err
return err
}
if len(eb.subqueries) > 0 {
return nil, expectedLazyRoutineError("subquery")
return expectedLazyRoutineError("subquery")
}
if len(eb.cascades) > 0 {
return nil, expectedLazyRoutineError("cascade")
return expectedLazyRoutineError("cascade")
}
if len(eb.checks) > 0 {
return nil, expectedLazyRoutineError("check")
return expectedLazyRoutineError("check")
}
return b.factory.ConstructPlan(
plan.root, nil /* subqueries */, nil /* cascades */, nil /* checks */, inputRowCount,
plan, err := b.factory.ConstructPlan(
ePlan.root, nil /* subqueries */, nil /* cascades */, nil /* checks */, inputRowCount,
)
if err != nil {
return err
}
err = fn(plan, true /* isFinalPlan */)
if err != nil {
return err
}
return nil
}
return tree.NewTypedRoutineExpr(
"subquery",
nil, /* args */
planFn,
1, /* numStmts */
planGen,
subquery.Typ,
false, /* enableStepping */
), nil
Expand Down Expand Up @@ -770,7 +776,7 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ

// Create a tree.RoutinePlanFn that can plan the statements in the UDF body.
// TODO(mgartner): Add support for WITH expressions inside UDF bodies.
planFn := b.buildRoutinePlanFn(udf.Params, udf.Body, false /* allowOuterWithRefs */)
planGen := b.buildRoutinePlanGenerator(udf.Params, udf.Body, false /* allowOuterWithRefs */)

// Enable stepping for volatile functions so that statements within the UDF
// see mutations made by the invoking statement and by previous executed
Expand All @@ -780,18 +786,17 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
return tree.NewTypedRoutineExpr(
udf.Name,
args,
planFn,
len(udf.Body),
planGen,
udf.Typ,
enableStepping,
), nil
}

// buildRoutinePlanFn returns a tree.RoutinePlanFn that can plan the statements
// buildRoutinePlanGenerator returns a tree.RoutinePlanFn that can plan the statements
// in a routine that has one or more arguments.
func (b *Builder) buildRoutinePlanFn(
func (b *Builder) buildRoutinePlanGenerator(
params opt.ColList, stmts memo.RelListExpr, allowOuterWithRefs bool,
) tree.RoutinePlanFn {
) tree.RoutinePlanGenerator {
// argOrd returns the ordinal of the argument within the arguments list that
// can be substituted for each reference to the given function parameter
// column. If the given column does not represent a function parameter,
Expand Down Expand Up @@ -821,88 +826,95 @@ func (b *Builder) buildRoutinePlanFn(
//
// Note: we put o outside of the function so we allocate it only once.
var o xform.Optimizer
planFn := func(
ctx context.Context, ref tree.RoutineExecFactory, stmtIdx int, args tree.Datums,
) (tree.RoutinePlan, error) {
o.Init(ctx, b.evalCtx, b.catalog)
f := o.Factory()
stmt := stmts[stmtIdx]

// Copy the expression into a new memo. Replace parameter references
// with argument datums.
addedWithBindings := false
var replaceFn norm.ReplaceFunc
replaceFn = func(e opt.Expr) opt.Expr {
switch t := e.(type) {
case *memo.VariableExpr:
if ord, ok := argOrd(t.Col); ok {
return f.ConstructConstVal(args[ord], t.Typ)
}

case *memo.WithScanExpr:
// Allow referring to "outer" With expressions, if
// allowOuterWithRefs is true. The bound expressions are not
// part of this Memo, but they are used only for their
// relational properties, which should be valid.
//
// We must add all With expressions to the metadata even if they
// aren't referred to directly because they might be referred to
// transitively through other With expressions. For example, if
// stmt refers to With expression &1, and &1 refers to With
// expression &2, we must include &2 in the metadata so that its
// relational properties are available. See #87733.
//
// We lazily add these With expressions to the metadata here
// because the call to Factory.CopyAndReplace below clears With
// expressions in the metadata.
if allowOuterWithRefs && !addedWithBindings {
b.mem.Metadata().ForEachWithBinding(func(id opt.WithID, expr opt.Expr) {
f.Metadata().AddWithBinding(id, expr)
})
addedWithBindings = true
planGen := func(
ctx context.Context, ref tree.RoutineExecFactory, args tree.Datums, fn tree.RoutinePlanGeneratedFunc,
) error {
for i := range stmts {
stmt := stmts[i]
o.Init(ctx, b.evalCtx, b.catalog)
f := o.Factory()

// Copy the expression into a new memo. Replace parameter references
// with argument datums.
addedWithBindings := false
var replaceFn norm.ReplaceFunc
replaceFn = func(e opt.Expr) opt.Expr {
switch t := e.(type) {
case *memo.VariableExpr:
if ord, ok := argOrd(t.Col); ok {
return f.ConstructConstVal(args[ord], t.Typ)
}

case *memo.WithScanExpr:
// Allow referring to "outer" With expressions, if
// allowOuterWithRefs is true. The bound expressions are not
// part of this Memo, but they are used only for their
// relational properties, which should be valid.
//
// We must add all With expressions to the metadata even if they
// aren't referred to directly because they might be referred to
// transitively through other With expressions. For example, if
// stmt refers to With expression &1, and &1 refers to With
// expression &2, we must include &2 in the metadata so that its
// relational properties are available. See #87733.
//
// We lazily add these With expressions to the metadata here
// because the call to Factory.CopyAndReplace below clears With
// expressions in the metadata.
if allowOuterWithRefs && !addedWithBindings {
b.mem.Metadata().ForEachWithBinding(func(id opt.WithID, expr opt.Expr) {
f.Metadata().AddWithBinding(id, expr)
})
addedWithBindings = true
}
// Fall through.
}
// Fall through.
return f.CopyAndReplaceDefault(e, replaceFn)
}
return f.CopyAndReplaceDefault(e, replaceFn)
}
f.CopyAndReplace(stmt, stmt.PhysProps, replaceFn)
f.CopyAndReplace(stmt, stmt.PhysProps, replaceFn)

// Optimize the memo.
optimizedExpr, err := o.Optimize()
if err != nil {
return nil, err
}
// Optimize the memo.
optimizedExpr, err := o.Optimize()
if err != nil {
return err
}

// Build the memo into a plan.
ef := ref.(exec.Factory)
eb := New(ctx, ef, &o, f.Memo(), b.catalog, optimizedExpr, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb.withExprs = withExprs
eb.disableTelemetry = true
eb.planLazySubqueries = true
plan, err := eb.Build()
if err != nil {
if errors.IsAssertionFailure(err) {
// Enhance the error with the EXPLAIN (OPT, VERBOSE) of the
// inner expression.
fmtFlags := memo.ExprFmtHideQualifications | memo.ExprFmtHideScalars |
memo.ExprFmtHideTypes
explainOpt := o.FormatExpr(optimizedExpr, fmtFlags)
err = errors.WithDetailf(err, "routineExpr:\n%s", explainOpt)
// Build the memo into a plan.
ef := ref.(exec.Factory)
eb := New(ctx, ef, &o, f.Memo(), b.catalog, optimizedExpr, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb.withExprs = withExprs
eb.disableTelemetry = true
eb.planLazySubqueries = true
plan, err := eb.Build()
if err != nil {
if errors.IsAssertionFailure(err) {
// Enhance the error with the EXPLAIN (OPT, VERBOSE) of the
// inner expression.
fmtFlags := memo.ExprFmtHideQualifications | memo.ExprFmtHideScalars |
memo.ExprFmtHideTypes
explainOpt := o.FormatExpr(optimizedExpr, fmtFlags)
err = errors.WithDetailf(err, "routineExpr:\n%s", explainOpt)
}
return err
}
if len(eb.subqueries) > 0 {
return expectedLazyRoutineError("subquery")
}
if len(eb.cascades) > 0 {
return expectedLazyRoutineError("cascade")
}
if len(eb.checks) > 0 {
return expectedLazyRoutineError("check")
}
isFinalPlan := i == len(stmts)-1
err = fn(plan, isFinalPlan)
if err != nil {
return err
}
return nil, err
}
if len(eb.subqueries) > 0 {
return nil, expectedLazyRoutineError("subquery")
}
if len(eb.cascades) > 0 {
return nil, expectedLazyRoutineError("cascade")
}
if len(eb.checks) > 0 {
return nil, expectedLazyRoutineError("check")
}
return plan, nil
return nil
}
return planFn
return planGen
}

func expectedLazyRoutineError(typ string) error {
Expand Down
65 changes: 30 additions & 35 deletions pkg/sql/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// runs the plans. The resulting value of the last statement in the routine is
// returned.
func (p *planner) EvalRoutineExpr(
ctx context.Context, expr *tree.RoutineExpr, input tree.Datums,
ctx context.Context, expr *tree.RoutineExpr, args tree.Datums,
) (result tree.Datum, err error) {
// Return the cached result if it exists.
if expr.CachedResult != nil {
Expand Down Expand Up @@ -59,46 +59,41 @@ func (p *planner) EvalRoutineExpr(
}

// Execute each statement in the routine sequentially.
stmtIdx := 0
ef := newExecFactory(ctx, p)
for i := 0; i < expr.NumStmts; i++ {
if err := func() error {
opName := "udf-stmt-" + expr.Name + "-" + strconv.Itoa(i)
ctx, sp := tracing.ChildSpan(ctx, opName)
defer sp.Finish()

// Generate a plan for executing the ith statement.
plan, err := expr.PlanFn(ctx, ef, i, input)
if err != nil {
return err
}

// If this is the last statement, use the rowResultWriter created above.
// Otherwise, use a rowResultWriter that drops all rows added to it.
var w rowResultWriter
if i == expr.NumStmts-1 {
w = rrw
} else {
w = &droppingResultWriter{}
}

// Place a sequence point before each statement in the routine for
// volatile functions.
if expr.EnableStepping {
if err := txn.Step(ctx); err != nil {
return err
}
}
err = expr.ForEachPlan(ctx, ef, args, func(plan tree.RoutinePlan, isFinalPlan bool) error {
stmtIdx++
opName := "udf-stmt-" + expr.Name + "-" + strconv.Itoa(stmtIdx)
ctx, sp := tracing.ChildSpan(ctx, opName)
defer sp.Finish()

// If this is the last statement, use the rowResultWriter created above.
// Otherwise, use a rowResultWriter that drops all rows added to it.
var w rowResultWriter
if isFinalPlan {
w = rrw
} else {
w = &droppingResultWriter{}
}

// Run the plan.
err = runPlanInsidePlan(ctx, p.RunParams(ctx), plan.(*planComponents), w)
if err != nil {
// Place a sequence point before each statement in the routine for
// volatile functions.
if expr.EnableStepping {
if err := txn.Step(ctx); err != nil {
return err
}
}

return nil
}(); err != nil {
return nil, err
// Run the plan.
err = runPlanInsidePlan(ctx, p.RunParams(ctx), plan.(*planComponents), w)
if err != nil {
return err
}

return nil
})
if err != nil {
return nil, err
}

// Fetch the first row from the row container and return the first
Expand Down
Loading

0 comments on commit 08ac8fd

Please sign in to comment.