Skip to content

Commit

Permalink
sql: evaluate correlated subqueries as routines
Browse files Browse the repository at this point in the history
Previously, the optimizer would error in rare cases when it was unable
to hoist correlated subqueries into apply-joins. Now, scalar, correlated
subqueries that aren't hoisted are executed successfully. There is
remaining work to apply the same method in this commit to `EXISTS` and
`<op> ANY` subqueries.

Hoisting correlated subqueries is not possible when a conditional
expression, like a `CASE`, wraps a subquery that is not leak-proof. One
of the effects of hoisting a subquery is that the subquery will be
unconditionally evaluated. For leak-proof subqueries, the worst case is
that unnecessary computation is performed. For non-leak-proof
subqueries, errors could originate from the subquery when it should have
never been evaluated because the corresponding conditional expression
was never true. So, in order to support these cases, we must be able to
execute a correlated subquery.

A correlated subquery can be thought of as a relational expression with
parameters that need to be filled in with constant value arguments for
each invocation. It is essentially a user-defined function with a single
statement in the function body. So, the `tree.RoutineExpr` machinery
that powers UDFs is easily repurposed to facilitate evaluation of
correlated subqueries.

Fixes cockroachdb#71908
Fixes cockroachdb#73573
Fixes cockroachdb#80169

Release note (sql change): Some queries which previously resulted in the
error "could not decorrelate subquery" now succeed.
  • Loading branch information
mgartner committed Jan 13, 2023
1 parent 94493ff commit 90dab90
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 18 deletions.
99 changes: 99 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/subquery
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,105 @@ query I
SELECT col0 FROM tab4 WHERE (col0 <= 0 AND col4 <= 5.38) OR (col4 IN (SELECT col1 FROM tab4 WHERE col1 > 8.27)) AND (col3 <= 5 AND (col3 BETWEEN 7 AND 9))
----

subtest correlated

statement ok
CREATE TABLE corr (
k INT PRIMARY KEY,
i INT
)

statement ok
INSERT INTO corr VALUES (1, 10), (2, 22), (3, 30), (4, 40), (5, 50);

query II
SELECT * FROM corr
WHERE CASE WHEN k < 5 THEN k*10 = (SELECT i FROM corr tmp WHERE k = corr.k) END
----
1 10
3 30
4 40

query III colnames
SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) END AS prev_i
FROM corr
----
k i prev_i
1 10 NULL
2 22 10
3 30 22
4 40 30
5 50 40

query T
EXPLAIN (VERBOSE)
SELECT k,
CASE WHEN k > 1 THEN (SELECT i/1 FROM corr tmp WHERE i < corr.i ORDER BY i LIMIT 1) END
FROM corr
----
distribution: local
vectorized: true
·
• render
│ columns: (k, "case")
│ render case: CASE WHEN k > 1 THEN subquery(i) ELSE CAST(NULL AS DECIMAL) END
│ render k: k
└── • scan
columns: (k, i)
estimated row count: 1,000 (missing stats)
table: corr@corr_pkey
spans: FULL SCAN

# A test similar to the previous showing that the physical ordering requested by
# the ORDER BY is respected when re-optimizing the subquery.
query IIR colnames
SELECT k, i,
CASE WHEN k > 1 THEN (SELECT i/1 FROM corr tmp WHERE i < corr.i ORDER BY i DESC LIMIT 1) END prev_i
FROM corr
----
k i prev_i
1 10 NULL
2 22 10
3 30 22
4 40 30
5 50 40

# Correlated subqueries can reference outer with expressions.
query III colnames
WITH w AS MATERIALIZED (
(VALUES (1))
)
SELECT k, i,
CASE WHEN k > 0 THEN (SELECT i+corr.i FROM corr tmp UNION ALL SELECT * FROM w LIMIT 1) END i_plus_first_i
FROM corr
----
k i i_plus_first_i
1 10 20
2 22 32
3 30 40
4 40 50
5 50 60

# Uncorrelated subqueries within correlated subqueries can reference outer with
# expressions.
query III colnames
WITH w AS MATERIALIZED (
(VALUES (1))
)
SELECT k, i,
CASE WHEN k > 0 THEN (SELECT i+corr.i FROM corr tmp WHERE k = (SELECT * FROM w)) END i_plus_first_i
FROM corr
----
k i i_plus_first_i
1 10 20
2 22 32
3 30 40
4 40 50
5 50 60

subtest regressions

statement ok
CREATE TABLE z (z INT PRIMARY KEY)

Expand Down
110 changes: 92 additions & 18 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/opt/norm"
"github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical"
"github.com/cockroachdb/cockroach/pkg/sql/opt/xform"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinsregistry"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand Down Expand Up @@ -540,6 +541,8 @@ func (b *Builder) buildItem(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Ty
func (b *Builder) buildAny(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.TypedExpr, error) {
any := scalar.(*memo.AnyExpr)
// We cannot execute correlated subqueries.
// TODO(mgartner): Plan correlated ANY subqueries using tree.RoutineExpr.
// See buildSubquery.
if !any.Input.Relational().OuterCols.Empty() {
return nil, b.decorrelationError()
}
Expand Down Expand Up @@ -581,6 +584,8 @@ func (b *Builder) buildExistsSubquery(
) (tree.TypedExpr, error) {
exists := scalar.(*memo.ExistsExpr)
// We cannot execute correlated subqueries.
// TODO(mgartner): Plan correlated EXISTS subqueries using tree.RoutineExpr.
// See buildSubquery.
if !exists.Input.Relational().OuterCols.Empty() {
return nil, b.decorrelationError()
}
Expand Down Expand Up @@ -610,16 +615,49 @@ func (b *Builder) buildSubquery(
return nil, errors.Errorf("subquery input with multiple columns")
}

// We cannot execute correlated subqueries.
// TODO(mgartner): We can execute correlated subqueries by making them
// routines, like we do below.
if !input.Relational().OuterCols.Empty() {
return nil, b.decorrelationError()
// Build correlated subqueries as lazily-evaluated routines.
if outerCols := input.Relational().OuterCols; !outerCols.Empty() {
// The outer columns of the subquery become the parameters of the
// routine.
params := outerCols.ToList()

// The outer columns of the subquery, as indexed columns, are the
// arguments of the routine.
// The arguments are indexed variables representing the outer columns.
args := make(tree.TypedExprs, len(params))
for i := range args {
args[i] = b.indexedVar(ctx, b.mem.Metadata(), params[i])
}

// Create a single-element RelListExpr representing the subquery.
outputCol := input.Relational().OutputCols.SingleColumn()
aliasedCol := opt.AliasedColumn{
Alias: b.mem.Metadata().ColumnMeta(outputCol).Alias,
ID: outputCol,
}
stmts := memo.RelListExpr{memo.RelRequiredPropsExpr{
RelExpr: input,
PhysProps: &physical.Required{
Presentation: physical.Presentation{aliasedCol},
},
}}

// Create a tree.RoutinePlanFn that can plan the single statement
// representing the subquery.
planFn := b.buildRoutinePlanFn(params, stmts, true /* allowOuterWithRefs */)
return tree.NewTypedRoutineExpr(
"subquery",
args,
planFn,
1, /* numStmts */
subquery.Typ,
false, /* enableStepping */
true, /* calledOnNullInput */
), nil
}

// Build lazily-evaluated, uncorrelated subqueries as routines.
if b.planLazySubqueries {
// Build lazily-evaluated subqueries as routines.
//
// Note: We reuse the optimizer and memo from the original expression
// because we don't need to optimize the subquery input any further.
// It's already been fully optimized because it is uncorrelated and has
Expand All @@ -629,11 +667,14 @@ func (b *Builder) buildSubquery(
// once. We should cache their result to avoid all this overhead for
// every invocation.
inputRowCount := int64(input.Relational().Statistics().RowCountIfAvailable())
withExprs := make([]builtWithExpr, len(b.withExprs))
copy(withExprs, b.withExprs)
planFn := func(
ctx context.Context, ref tree.RoutineExecFactory, stmtIdx int, args tree.Datums,
) (tree.RoutinePlan, error) {
ef := ref.(exec.Factory)
eb := New(ctx, ef, b.optimizer, b.mem, b.catalog, input, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb.withExprs = withExprs
eb.disableTelemetry = true
eb.planLazySubqueries = true
plan, err := eb.buildRelational(input)
Expand Down Expand Up @@ -723,7 +764,8 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
}

// Create a tree.RoutinePlanFn that can plan the statements in the UDF body.
planFn := b.buildRoutinePlanFn(udf.Params, udf.Body)
// TODO(mgartner): Add support for WITH expressions inside UDF bodies.
planFn := b.buildRoutinePlanFn(udf.Params, udf.Body, false /* allowOuterWithRefs */)

// Enable stepping for volatile functions so that statements within the UDF
// see mutations made by the invoking statement and by previous executed
Expand All @@ -742,9 +784,9 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
}

// buildRoutinePlanFn returns a tree.RoutinePlanFn that can plan the statements
// in a routine.
// in a routine that has one or more arguments.
func (b *Builder) buildRoutinePlanFn(
params opt.ColList, stmts memo.RelListExpr,
params opt.ColList, stmts memo.RelListExpr, allowOuterWithRefs bool,
) tree.RoutinePlanFn {
// argOrd returns the ordinal of the argument within the arguments list that
// can be substituted for each reference to the given function parameter
Expand All @@ -759,6 +801,13 @@ func (b *Builder) buildRoutinePlanFn(
return 0, false
}

// We will pre-populate the withExprs of the new execbuilder.
var withExprs []builtWithExpr
if allowOuterWithRefs {
withExprs = make([]builtWithExpr, len(b.withExprs))
copy(withExprs, b.withExprs)
}

// Plan the statements in a separate memo. We use an exec.Factory passed to
// the closure rather than b.factory to support executing plans that are
// generated with explain.Factory.
Expand All @@ -777,28 +826,53 @@ func (b *Builder) buildRoutinePlanFn(

// Copy the expression into a new memo. Replace parameter references
// with argument datums.
addedWithBindings := false
var replaceFn norm.ReplaceFunc
replaceFn = func(e opt.Expr) opt.Expr {
if v, ok := e.(*memo.VariableExpr); ok {
if ord, ok := argOrd(v.Col); ok {
return f.ConstructConstVal(args[ord], v.Typ)
switch t := e.(type) {
case *memo.VariableExpr:
if ord, ok := argOrd(t.Col); ok {
return f.ConstructConstVal(args[ord], t.Typ)
}

case *memo.WithScanExpr:
// Allow referring to "outer" With expressions, if
// allowOuterWithRefs is true. The bound expressions are not
// part of this Memo, but they are used only for their
// relational properties, which should be valid.
//
// We must add all With expressions to the metadata even if they
// aren't referred to directly because they might be referred to
// transitively through other With expressions. For example, if
// stmt refers to With expression &1, and &1 refers to With
// expression &2, we must include &2 in the metadata so that its
// relational properties are available. See #87733.
//
// We lazily add these With expressions to the metadata here
// because the call to Factory.CopyAndReplace below clears With
// expressions in the metadata.
if allowOuterWithRefs && !addedWithBindings {
b.mem.Metadata().ForEachWithBinding(func(id opt.WithID, expr opt.Expr) {
f.Metadata().AddWithBinding(id, expr)
})
addedWithBindings = true
}
// Fall through.
}
return f.CopyAndReplaceDefault(e, replaceFn)
}
f.CopyAndReplace(stmt, stmt.PhysProps, replaceFn)

// Optimize the memo.
newRightSide, err := o.Optimize()
optimizedExpr, err := o.Optimize()
if err != nil {
return nil, err
}

// Build the memo into a plan.
// TODO(mgartner): Add support for WITH expressions inside UDF bodies.
// TODO(mgartner): Add support for subqueries inside UDF bodies.
ef := ref.(exec.Factory)
eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb := New(ctx, ef, &o, f.Memo(), b.catalog, optimizedExpr, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
eb.withExprs = withExprs
eb.disableTelemetry = true
eb.planLazySubqueries = true
plan, err := eb.Build()
Expand All @@ -808,7 +882,7 @@ func (b *Builder) buildRoutinePlanFn(
// inner expression.
fmtFlags := memo.ExprFmtHideQualifications | memo.ExprFmtHideScalars |
memo.ExprFmtHideTypes
explainOpt := o.FormatExpr(newRightSide, fmtFlags)
explainOpt := o.FormatExpr(optimizedExpr, fmtFlags)
err = errors.WithDetailf(err, "routineExpr:\n%s", explainOpt)
}
return nil, err
Expand Down
61 changes: 61 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/subquery
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,64 @@ vectorized: true
estimated row count: 1,000 (missing stats)
table: abc@abc_pkey
spans: FULL SCAN

statement ok
CREATE TABLE corr (
k INT PRIMARY KEY,
i INT,
FAMILY (k, i)
);
INSERT INTO corr VALUES (1, 10), (2, 22), (3, 30), (4, 40), (5, 50)

# Case where the subquery in a filter cannot be hoisted into an apply-join.
query T
EXPLAIN (VERBOSE)
SELECT * FROM corr
WHERE CASE WHEN k < 5 THEN k*10 = (SELECT i FROM corr tmp WHERE k = corr.k) END
----
distribution: local
vectorized: true
·
• filter
│ columns: (k, i)
│ estimated row count: 333 (missing stats)
│ filter: CASE WHEN k < 5 THEN (k * 10) = subquery(k) ELSE CAST(NULL AS BOOL) END
└── • scan
columns: (k, i)
estimated row count: 1,000 (missing stats)
table: corr@corr_pkey
spans: FULL SCAN

# Case where the subquery in a projection cannot be hoisted into an apply-join.
query T
EXPLAIN (VERBOSE)
SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) ELSE 0 END AS prev_i
FROM corr
----
distribution: local
vectorized: true
·
• render
│ columns: (k, i, prev_i)
│ render prev_i: CASE WHEN k > 1 THEN subquery(k) ELSE 0 END
│ render k: k
│ render i: i
└── • scan
columns: (k, i)
estimated row count: 1,000 (missing stats)
table: corr@corr_pkey
spans: FULL SCAN

# Each invocation of the subquery is re-optimized, so the scans are constrained
# by constant values substituted for corr.k.
query T kvtrace
SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) ELSE 0 END AS prev_i
FROM corr
----
Scan /Table/110/{1-2}
Scan /Table/110/1/1/0
Scan /Table/110/1/2/0
Scan /Table/110/1/3/0
Scan /Table/110/1/4/0

0 comments on commit 90dab90

Please sign in to comment.