Skip to content

Commit

Permalink
Merge #38826
Browse files Browse the repository at this point in the history
38826: exec: Add support for vectorized engine to use builtin functions. r=jordanlewis a=rohany

Add support for the vectorized engine to use all builtin functions that are provided by distsql, but in a relatively slow manner. Future work can specialize some functions for improved performance.

Co-authored-by: Rohan Yadav <[email protected]>
  • Loading branch information
craig[bot] and rohany committed Jul 17, 2019
2 parents 9ef0f13 + 996cf44 commit 3d27643
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 43 deletions.
24 changes: 24 additions & 0 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,30 @@ func planProjectionOperators(
return planProjectionExpr(ctx, t.Operator, t.TypedLeft(), t.TypedRight(), columnTypes, input)
case *tree.BinaryExpr:
return planProjectionExpr(ctx, t.Operator, t.TypedLeft(), t.TypedRight(), columnTypes, input)
case *tree.FuncExpr:
var (
inputCols []int
projectionMem int
)
ct = columnTypes
op = input
for _, e := range t.Exprs {
var err error
// TODO(rohany): This could be done better, especially in the case of
// constant arguments, because the vectorized engine right now
// creates a new column full of the constant value.
op, resultIdx, ct, projectionMem, err = planProjectionOperators(ctx, e.(tree.TypedExpr), ct, op)
if err != nil {
return nil, resultIdx, nil, memUsed, err
}
inputCols = append(inputCols, resultIdx)
memUsed += projectionMem
}
funcOutputType := t.ResolvedType()
resultIdx = len(ct)
ct = append(ct, *funcOutputType)
op = exec.NewBuiltinFunctionOperator(ctx, t, ct, inputCols, resultIdx, op)
return op, resultIdx, ct, memUsed, nil
case tree.Datum:
datumType := t.ResolvedType()
ct := columnTypes
Expand Down
45 changes: 2 additions & 43 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@ package distsqlrun

import (
"context"
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate"
"github.com/lib/pq/oid"
)

// materializer converts an exec.Operator input into a RowSource.
Expand Down Expand Up @@ -140,46 +136,9 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
continue
}
}

ct := typs[outIdx]
switch ct.Family() {
case types.BoolFamily:
if col.Bool()[rowIdx] {
m.row[outIdx].Datum = tree.DBoolTrue
} else {
m.row[outIdx].Datum = tree.DBoolFalse
}
case types.IntFamily:
switch ct.Width() {
case 8:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int8()[rowIdx]))
case 16:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int16()[rowIdx]))
case 32:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int32()[rowIdx]))
default:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int64()[rowIdx]))
}
case types.FloatFamily:
m.row[outIdx].Datum = m.da.NewDFloat(tree.DFloat(col.Float64()[rowIdx]))
case types.DecimalFamily:
m.row[outIdx].Datum = m.da.NewDDecimal(tree.DDecimal{Decimal: col.Decimal()[rowIdx]})
case types.DateFamily:
m.row[outIdx].Datum = tree.NewDDate(pgdate.MakeCompatibleDateFromDisk(col.Int64()[rowIdx]))
case types.StringFamily:
b := col.Bytes()[rowIdx]
if ct.Oid() == oid.T_name {
m.row[outIdx].Datum = m.da.NewDString(tree.DString(*(*string)(unsafe.Pointer(&b))))
} else {
m.row[outIdx].Datum = m.da.NewDName(tree.DString(*(*string)(unsafe.Pointer(&b))))
}
case types.BytesFamily:
m.row[outIdx].Datum = m.da.NewDBytes(tree.DBytes(col.Bytes()[rowIdx]))
case types.OidFamily:
m.row[outIdx].Datum = m.da.NewDOid(tree.MakeDOid(tree.DInt(col.Int64()[rowIdx])))
default:
panic(fmt.Sprintf("Unsupported column type %s", ct.String()))
}

m.row[outIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, ct)
}
return m.ProcessRowHelper(m.row), nil
}
Expand Down
127 changes: 127 additions & 0 deletions pkg/sql/exec/builtin_funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package exec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
semtypes "github.com/cockroachdb/cockroach/pkg/sql/types"
)

type defaultBuiltinFuncOperator struct {
input Operator
evalCtx *tree.EvalContext
funcExpr *tree.FuncExpr
columnTypes []semtypes.T
inputCols []int
outputIdx int
outputType *semtypes.T
outputPhysType types.T
converter func(tree.Datum) (interface{}, error)

row tree.Datums
da sqlbase.DatumAlloc
}

func (b *defaultBuiltinFuncOperator) Init() {
b.input.Init()
}

func (b *defaultBuiltinFuncOperator) Next(ctx context.Context) coldata.Batch {
batch := b.input.Next(ctx)
n := batch.Length()
if n == 0 {
return batch
}

if b.outputIdx == batch.Width() {
batch.AppendCol(b.outputPhysType)
}

sel := batch.Selection()
for i := uint16(0); i < n; i++ {
rowIdx := i
if sel != nil {
rowIdx = sel[i]
}

hasNulls := false

for j := range b.inputCols {
col := batch.ColVec(b.inputCols[j])
if col.MaybeHasNulls() && col.Nulls().NullAt(rowIdx) {
hasNulls = true
b.row[j] = tree.DNull
} else {
b.row[j] = PhysicalTypeColElemToDatum(col, rowIdx, b.da, b.columnTypes[b.inputCols[j]])
}
}

var (
res tree.Datum
err error
)
// Some functions cannot handle null arguments.
if hasNulls && !b.funcExpr.CanHandleNulls() {
res = tree.DNull
} else {
res, err = b.funcExpr.ResolvedOverload().Fn(b.evalCtx, b.row)
if err != nil {
panic(err)
}
}

// Convert the datum into a physical type and write it out.
if res == tree.DNull {
batch.ColVec(b.outputIdx).Nulls().SetNull(uint16(rowIdx))
} else {
converted, err := b.converter(res)
if err != nil {
panic(err)
}
coldata.SetValueAt(batch.ColVec(b.outputIdx), converted, rowIdx, b.outputPhysType)
}
}
return batch
}

// NewBuiltinFunctionOperator returns an operator that applies builtin functions.
func NewBuiltinFunctionOperator(
evalCtx *tree.EvalContext,
funcExpr *tree.FuncExpr,
columnTypes []semtypes.T,
inputCols []int,
outputIdx int,
input Operator,
) Operator {

outputType := funcExpr.ResolvedType()

// For now, return the default builtin operator. Future work can specialize
// out the operators to efficient implementations of specific builtins.
return &defaultBuiltinFuncOperator{
input: input,
evalCtx: evalCtx,
funcExpr: funcExpr,
outputIdx: outputIdx,
columnTypes: columnTypes,
outputType: outputType,
outputPhysType: conv.FromColumnType(outputType),
converter: conv.GetDatumToPhysicalFn(outputType),
row: make(tree.Datums, len(inputCols)),
inputCols: inputCols,
}
}
165 changes: 165 additions & 0 deletions pkg/sql/exec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package exec

import (
"context"
"fmt"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
semtypes "github.com/cockroachdb/cockroach/pkg/sql/types"
)

// Mock typing context for the typechecker.
type mockTypeContext struct {
typs []semtypes.T
}

func (p *mockTypeContext) IndexedVarEval(idx int, ctx *tree.EvalContext) (tree.Datum, error) {
return tree.DNull.Eval(ctx)
}

func (p *mockTypeContext) IndexedVarResolvedType(idx int) *semtypes.T {
return &p.typs[idx]
}

func (p *mockTypeContext) IndexedVarNodeFormatter(idx int) tree.NodeFormatter {
n := tree.Name(fmt.Sprintf("$%d", idx))
return &n
}

func TestBasicBuiltinFunctions(t *testing.T) {
// Trick to get the init() for the builtins package to run.
_ = builtins.AllBuiltinNames

testCases := []struct {
desc string
expr string
inputCols []int
inputTuples tuples
inputTypes []semtypes.T
outputTypes []semtypes.T
outputTuples tuples
}{
{
desc: "Absolute value",
expr: "abs(@1)",
inputCols: []int{0},
inputTuples: tuples{{1}, {-1}},
inputTypes: []semtypes.T{*semtypes.Int},
outputTuples: tuples{{1}, {1}},
outputTypes: []semtypes.T{*semtypes.Int, *semtypes.Int},
},
{
desc: "String length",
expr: "length(@1)",
inputCols: []int{0},
inputTuples: tuples{{"Hello"}, {"The"}},
inputTypes: []semtypes.T{*semtypes.String},
outputTuples: tuples{{5}, {3}},
outputTypes: []semtypes.T{*semtypes.String, *semtypes.Int},
},
}

tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
runTests(t, []tuples{tc.inputTuples}, tc.outputTuples, orderedVerifier, []int{1},
func(input []Operator) (Operator, error) {
expr, err := parser.ParseExpr(tc.expr)
if err != nil {
t.Fatal(err)
}

p := &mockTypeContext{typs: tc.inputTypes}
typedExpr, err := tree.TypeCheck(expr, &tree.SemaContext{IVarContainer: p}, semtypes.Any)
if err != nil {
t.Fatal(err)
}

return NewBuiltinFunctionOperator(tctx, typedExpr.(*tree.FuncExpr), tc.outputTypes, tc.inputCols, 1, input[0]), nil
})
})
}
}

func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls bool) {
ctx := context.Background()
tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())

batch := coldata.NewMemBatch([]types.T{types.Int64})
col := batch.ColVec(0).Int64()

for i := int64(0); i < coldata.BatchSize; i++ {
if float64(i) < coldata.BatchSize*selectivity {
col[i] = -1
} else {
col[i] = 1
}
}

if hasNulls {
for i := 0; i < coldata.BatchSize; i++ {
if rand.Float64() < nullProbability {
batch.ColVec(0).Nulls().SetNull(uint16(i))
}
}
}

batch.SetLength(coldata.BatchSize)

if useSelectionVector {
batch.SetSelection(true)
sel := batch.Selection()
for i := int64(0); i < coldata.BatchSize; i++ {
sel[i] = uint16(i)
}
}

source := NewRepeatableBatchSource(batch)
source.Init()

expr, err := parser.ParseExpr("abs(@1)")
if err != nil {
b.Fatal(err)
}
p := &mockTypeContext{typs: []semtypes.T{*semtypes.Int}}
typedExpr, err := tree.TypeCheck(expr, &tree.SemaContext{IVarContainer: p}, semtypes.Any)
if err != nil {
b.Fatal(err)
}
op := NewBuiltinFunctionOperator(tctx, typedExpr.(*tree.FuncExpr), []semtypes.T{*semtypes.Int}, []int{0}, 1, source)

b.SetBytes(int64(8 * coldata.BatchSize))
b.ResetTimer()
for i := 0; i < b.N; i++ {
op.Next(ctx)
}
}

func BenchmarkBuiltinFunctions(b *testing.B) {
_ = builtins.AllBuiltinNames
for _, useSel := range []bool{true, false} {
for _, hasNulls := range []bool{true, false} {
b.Run(fmt.Sprintf("useSel=%t,hasNulls=%t", useSel, hasNulls), func(b *testing.B) {
benchmarkBuiltinFunctions(b, useSel, hasNulls)
})
}
}
}
Loading

0 comments on commit 3d27643

Please sign in to comment.