Skip to content

Commit

Permalink
colexec: add native support for COALESCE, IF, NULLIF
Browse files Browse the repository at this point in the history
This commit adds the native vectorized support for `CoalesceExpr`,
`IfExpr`, and `NullIfExpr` by planning the equivalent CASE expressions.
Namely, for `CoalesceExpr` we do
```
CASE
  WHEN CoalesceExpr.Exprs[0] IS DISTINCT FROM NULL THEN CoalesceExpr.Exprs[0]
  WHEN CoalesceExpr.Exprs[1] IS DISTINCT FROM NULL THEN CoalesceExpr.Exprs[1]
  ...
END
```
for `IfExpr` we do
```
CASE WHEN IfExpr.Cond THEN IfExpr.True ELSE IfExpr.Else END
```
and for `NullIfExpr` we do
```
CASE WHEN Expr1 == Expr2 THEN NULL ELSE Expr1 END
```

This commit additionally introduces some unit tests for these newly
supported expressions while extracting out some testing facilities from
the caseOp tests.

Release note: None

Release justification: low risk, high benefit change to existing
functionality.
  • Loading branch information
yuzefovich committed Mar 14, 2022
1 parent c927c31 commit 9ffe468
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 51 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go_test(
"buffer_test.go",
"builtin_funcs_test.go",
"case_test.go",
"coalesce_test.go",
"columnarizer_test.go",
"count_test.go",
"crossjoiner_test.go",
Expand All @@ -109,6 +110,7 @@ go_test(
"external_sort_test.go",
"hash_aggregator_test.go",
"hashjoiner_test.go",
"if_expr_test.go",
"inject_setup_test.go",
"is_null_ops_test.go",
"joiner_utils_test.go",
Expand All @@ -120,6 +122,7 @@ go_test(
"offset_test.go",
"ordered_synchronizer_test.go",
"parallel_unordered_synchronizer_test.go",
"proj_utils_test.go",
"rowstovec_test.go",
"select_in_test.go",
"serial_unordered_synchronizer_test.go",
Expand Down
55 changes: 6 additions & 49 deletions pkg/sql/colexec/case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
Expand All @@ -30,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestCaseOp(t *testing.T) {
Expand Down Expand Up @@ -113,24 +110,10 @@ func TestCaseOpRandomized(t *testing.T) {
},
}

var da tree.DatumAlloc
rng, _ := randutil.NewTestRand()

numWhenArms := 1 + rng.Intn(5)
hasElseArm := rng.Float64() < 0.5

// Pick a random type to be used as the output type for the CASE expression.
// We're giving more weight to the types natively supported by the
// vectorized engine because all datum-backed types have the same backing
// datumVec which would occur disproportionally often without adjusting the
// weights.
caseOutputType := randgen.RandType(rng)
for retry := 0; retry < 3; retry++ {
if typeconv.TypeFamilyToCanonicalTypeFamily(caseOutputType.Family()) != typeconv.DatumVecCanonicalTypeFamily {
break
}
caseOutputType = randgen.RandType(rng)
}
outputType := getRandomTypeFavorNative(rng)

// Construct such a CASE expression that the first column from the input is
// used as the "partitioning" column (used by WHEN arms for matching), the
Expand Down Expand Up @@ -163,11 +146,11 @@ func TestCaseOpRandomized(t *testing.T) {
partitionIdx := rng.Intn(numPartitions)
inputRow[0].Datum = tree.NewDInt(tree.DInt(partitionIdx))
for j := 1; j < numInputCols; j++ {
inputRow[j] = rowenc.DatumToEncDatum(caseOutputType, randgen.RandDatum(rng, caseOutputType, true /* nullOk */))
inputRow[j] = rowenc.DatumToEncDatum(outputType, randgen.RandDatum(rng, outputType, true /* nullOk */))
}
inputRows[i] = inputRow
if !hasElseArm && partitionIdx == numWhenArms {
expectedOutput[i] = rowenc.DatumToEncDatum(caseOutputType, tree.DNull)
expectedOutput[i] = rowenc.DatumToEncDatum(outputType, tree.DNull)
} else {
expectedOutput[i] = inputRow[partitionIdx+1]
}
Expand All @@ -176,35 +159,9 @@ func TestCaseOpRandomized(t *testing.T) {
inputTypes := make([]*types.T, numInputCols)
inputTypes[0] = types.Int
for i := 1; i < numInputCols; i++ {
inputTypes[i] = caseOutputType
inputTypes[i] = outputType
}
input := execinfra.NewRepeatableRowSource(inputTypes, inputRows)
columnarizer := NewBufferingColumnarizer(testAllocator, flowCtx, 1 /* processorID */, input)
caseOp, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, columnarizer, inputTypes, caseExpr, testMemAcc,
assertProjOpAgainstRowByRow(
t, flowCtx, &evalCtx, caseExpr, inputTypes, inputRows, expectedOutput, outputType,
)
require.NoError(t, err)
// We will project out all input columns while keeping only the output
// column of the case operator, for simplicity.
op := colexecbase.NewSimpleProjectOp(caseOp, numInputCols+1, []uint32{uint32(numInputCols)})
materializer := NewMaterializer(
flowCtx,
1, /* processorID */
colexecargs.OpWithMetaInfo{Root: op},
[]*types.T{caseOutputType},
)

materializer.Start(ctx)
for _, expectedDatum := range expectedOutput {
actualRow, meta := materializer.Next()
require.Nil(t, meta)
require.Equal(t, 1, len(actualRow))
cmp, err := expectedDatum.Compare(caseOutputType, &da, &evalCtx, &actualRow[0])
require.NoError(t, err)
require.Equal(t, 0, cmp)
}
// The materializer must have been fully exhausted now.
row, meta := materializer.Next()
require.Nil(t, row)
require.Nil(t, meta)
}
164 changes: 164 additions & 0 deletions pkg/sql/colexec/coalesce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestCoalesceBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
}

for _, tc := range []struct {
tuples colexectestutils.Tuples
renderExpr string
expected colexectestutils.Tuples
inputTypes []*types.T
skipAllNullsInjection bool
}{
{
// Basic test.
tuples: colexectestutils.Tuples{{1, -1}, {2, nil}, {nil, nil}, {nil, -4}, {nil, nil}, {nil, -6}, {7, nil}},
renderExpr: "COALESCE(@1, @2, 0)",
expected: colexectestutils.Tuples{{1}, {2}, {0}, {-4}, {0}, {-6}, {7}},
inputTypes: types.TwoIntCols,
},
{
// Test the short-circuiting behavior.
tuples: colexectestutils.Tuples{{1, 1, 0}},
renderExpr: "COALESCE(@1, @2 // @3)",
expected: colexectestutils.Tuples{{1}},
inputTypes: types.ThreeIntCols,
},
{
// Test the scenario when all expressions only project NULLs.
tuples: colexectestutils.Tuples{{nil, 1}, {nil, 2}},
renderExpr: "COALESCE(@1, @2 + NULL, @1 + @2)",
expected: colexectestutils.Tuples{{nil}, {nil}},
inputTypes: types.TwoIntCols,
// The output here contains only nulls, so we inject all nulls into
// the input, it won't change.
skipAllNullsInjection: true,
},
} {
runTests := colexectestutils.RunTestsWithTyps
if tc.skipAllNullsInjection {
runTests = colexectestutils.RunTestsWithoutAllNullsInjection
}
runTests(
t, testAllocator, []colexectestutils.Tuples{tc.tuples}, [][]*types.T{tc.inputTypes},
tc.expected, colexectestutils.OrderedVerifier,
func(inputs []colexecop.Operator) (colexecop.Operator, error) {
coalesceOp, err := colexectestutils.CreateTestProjectingOperator(
ctx, flowCtx, inputs[0], tc.inputTypes, tc.renderExpr, testMemAcc,
)
if err != nil {
return nil, err
}
// We will project out the input columns in order to have test
// cases be less verbose.
return colexecbase.NewSimpleProjectOp(coalesceOp, len(tc.inputTypes)+1, []uint32{uint32(len(tc.inputTypes))}), nil
})
}
}

func TestCoalesceRandomized(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
}

rng, _ := randutil.NewTestRand()
numExprs := 1 + rng.Intn(5)
outputType := getRandomTypeFavorNative(rng)

// Construct the COALESCE expression of the following form:
// COALESCE(IF(@1 = 0, @2, NULL), IF(@1 = 1, @3, NULL), ...).
// We will use the first column from the input as the "partitioning" column,
// and the following numExprs columns will be the projection columns.
coalesceExpr := "COALESCE("
for i := 0; i < numExprs; i++ {
if i > 0 {
coalesceExpr += ", "
}
coalesceExpr += fmt.Sprintf("IF(@1 = %d, @%d, NULL)", i, i+2)
}
coalesceExpr += ")"

numInputCols := 1 + numExprs
numInputRows := 1 + rng.Intn(coldata.BatchSize()) + coldata.BatchSize()*rng.Intn(5)
inputRows := make(rowenc.EncDatumRows, numInputRows)
// We always have an extra partition, regardless of whether we use an ELSE
// projection or not (if we don't, the ELSE arm will project all NULLs).
numPartitions := numExprs + 1
// We will populate the expected output at the same time as we're generating
// the input data set. Note that all input columns will be projected out, so
// we memorize only the output column of the COALESCE expression.
expectedOutput := make([]rowenc.EncDatum, numInputRows)
for i := range inputRows {
inputRow := make(rowenc.EncDatumRow, numInputCols)
partitionIdx := rng.Intn(numPartitions)
inputRow[0].Datum = tree.NewDInt(tree.DInt(partitionIdx))
for j := 1; j < numInputCols; j++ {
inputRow[j] = rowenc.DatumToEncDatum(outputType, randgen.RandDatum(rng, outputType, true /* nullOk */))
}
inputRows[i] = inputRow
if partitionIdx == numExprs {
expectedOutput[i] = rowenc.DatumToEncDatum(outputType, tree.DNull)
} else {
expectedOutput[i] = inputRow[partitionIdx+1]
}
}

inputTypes := make([]*types.T, numInputCols)
inputTypes[0] = types.Int
for i := 1; i < numInputCols; i++ {
inputTypes[i] = outputType
}
assertProjOpAgainstRowByRow(
t, flowCtx, &evalCtx, coalesceExpr, inputTypes, inputRows, expectedOutput, outputType,
)
}
68 changes: 67 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,35 @@ func planProjectionOperators(
}
op, resultIdx, typs, err = planCastOperator(ctx, acc, typs, op, resultIdx, expr.ResolvedType(), t.ResolvedType(), factory, evalCtx)
return op, resultIdx, typs, err
case *tree.CoalesceExpr:
// We handle CoalesceExpr by planning the equivalent CASE expression,
// namely
// CASE
// WHEN CoalesceExpr.Exprs[0] IS DISTINCT FROM NULL THEN CoalesceExpr.Exprs[0]
// WHEN CoalesceExpr.Exprs[1] IS DISTINCT FROM NULL THEN CoalesceExpr.Exprs[1]
// ...
// END
whens := make([]*tree.When, len(t.Exprs))
for i := range whens {
whens[i] = &tree.When{
Cond: tree.NewTypedComparisonExpr(
treecmp.MakeComparisonOperator(treecmp.IsDistinctFrom),
t.Exprs[i].(tree.TypedExpr),
tree.DNull,
),
Val: t.Exprs[i],
}
}
caseExpr, err := tree.NewTypedCaseExpr(
nil, /* expr */
whens,
nil, /* elseStmt */
t.ResolvedType(),
)
if err != nil {
return nil, resultIdx, typs, err
}
return planProjectionOperators(ctx, evalCtx, caseExpr, columnTypes, input, acc, factory, releasables)
case *tree.ComparisonExpr:
return planProjectionExpr(
ctx, evalCtx, t.Operator, t.ResolvedType(), t.TypedLeft(), t.TypedRight(),
Expand Down Expand Up @@ -2138,6 +2167,19 @@ func planProjectionOperators(
}
typs = appendOneType(typs, t.ResolvedType())
return op, resultIdx, typs, err
case *tree.IfExpr:
// We handle IfExpr by planning the equivalent CASE expression, namely
// CASE WHEN IfExpr.Cond THEN IfExpr.True ELSE IfExpr.Else END.
caseExpr, err := tree.NewTypedCaseExpr(
nil, /* expr */
[]*tree.When{{Cond: t.Cond, Val: t.True}},
t.TypedElseExpr(),
t.ResolvedType(),
)
if err != nil {
return nil, resultIdx, typs, err
}
return planProjectionOperators(ctx, evalCtx, caseExpr, columnTypes, input, acc, factory, releasables)
case *tree.IndexedVar:
return input, t.Idx, columnTypes, nil
case *tree.IsNotNullExpr:
Expand All @@ -2160,6 +2202,27 @@ func planProjectionOperators(
}
typs = appendOneType(typs, t.ResolvedType())
return op, outputIdx, typs, nil
case *tree.NullIfExpr:
// We handle NullIfExpr by planning the equivalent CASE expression,
// namely
// CASE WHEN Expr1 == Expr2 THEN NULL ELSE Expr1 END.
caseExpr, err := tree.NewTypedCaseExpr(
nil, /* expr */
[]*tree.When{{
Cond: tree.NewTypedComparisonExpr(
treecmp.MakeComparisonOperator(treecmp.EQ),
t.Expr1.(tree.TypedExpr),
t.Expr2.(tree.TypedExpr),
),
Val: tree.DNull,
}},
t.Expr1.(tree.TypedExpr),
t.ResolvedType(),
)
if err != nil {
return nil, resultIdx, typs, err
}
return planProjectionOperators(ctx, evalCtx, caseExpr, columnTypes, input, acc, factory, releasables)
case *tree.OrExpr:
return planLogicalProjectionOp(ctx, evalCtx, expr, columnTypes, input, acc, factory, releasables)
case *tree.Tuple:
Expand Down Expand Up @@ -2197,7 +2260,10 @@ func planProjectionOperators(
func checkSupportedProjectionExpr(left, right tree.TypedExpr) error {
leftTyp := left.ResolvedType()
rightTyp := right.ResolvedType()
if leftTyp.Equivalent(rightTyp) {
if leftTyp.Equivalent(rightTyp) || leftTyp.Family() == types.UnknownFamily || rightTyp.Family() == types.UnknownFamily {
// If either type is of an Unknown family, then the corresponding vector
// will only contain NULL values, so we won't run into the mixed-type
// issues.
return nil
}

Expand Down
Loading

0 comments on commit 9ffe468

Please sign in to comment.