Skip to content

Commit

Permalink
exec: Add support for vectorized engine to use builtin functions.
Browse files Browse the repository at this point in the history
Added support for the vectorized engine to use all builtin
functions that are supported by distsql. The mechanism
for doing this is relatively ineffecient however -- it
converts each row in a batch back into a datum row,
and uses the expression evaluator on this row to perform
the builtin function. The goal is to in the future extend
this work to create faster(vectorized) implementations of
different builtin functions, and switch to those instead
of the default builtin here for improved performance. This
includes refactoring the materializer's batch to row code
into a separate, reusable function.

Release note: None
  • Loading branch information
rohany committed Jul 12, 2019
1 parent b560c88 commit e3ab7ce
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 43 deletions.
14 changes: 14 additions & 0 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func planProjectionOperators(
return planProjectionExpr(ctx, t.Operator, t.TypedLeft(), t.TypedRight(), columnTypes, input)
case *tree.BinaryExpr:
return planProjectionExpr(ctx, t.Operator, t.TypedLeft(), t.TypedRight(), columnTypes, input)
case *tree.FuncExpr:
op, resultIdx, ct = planBuiltinFunctionExpr(ctx, t, columnTypes, input)
return op, resultIdx, ct, nil
case tree.Datum:
datumType := t.ResolvedType()
ct := columnTypes
Expand All @@ -637,6 +640,17 @@ func planProjectionOperators(
}
}

func planBuiltinFunctionExpr(
ctx *tree.EvalContext, f *tree.FuncExpr, columnTypes []semtypes.T, input exec.Operator,
) (exec.Operator, int, []semtypes.T) {
funcOutputType := f.ResolvedType()
ct := columnTypes
resultIdx := len(ct)
ct = append(ct, *funcOutputType)
op := exec.NewBuiltinFunctionOperator(ctx, ct, input, f, resultIdx)
return op, resultIdx, ct
}

func planProjectionExpr(
ctx *tree.EvalContext,
binOp tree.Operator,
Expand Down
45 changes: 2 additions & 43 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@ package distsqlrun

import (
"context"
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate"
"github.com/lib/pq/oid"
)

// materializer converts an exec.Operator input into a RowSource.
Expand Down Expand Up @@ -140,46 +136,9 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
continue
}
}

ct := typs[outIdx]
switch ct.Family() {
case types.BoolFamily:
if col.Bool()[rowIdx] {
m.row[outIdx].Datum = tree.DBoolTrue
} else {
m.row[outIdx].Datum = tree.DBoolFalse
}
case types.IntFamily:
switch ct.Width() {
case 8:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int8()[rowIdx]))
case 16:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int16()[rowIdx]))
case 32:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int32()[rowIdx]))
default:
m.row[outIdx].Datum = m.da.NewDInt(tree.DInt(col.Int64()[rowIdx]))
}
case types.FloatFamily:
m.row[outIdx].Datum = m.da.NewDFloat(tree.DFloat(col.Float64()[rowIdx]))
case types.DecimalFamily:
m.row[outIdx].Datum = m.da.NewDDecimal(tree.DDecimal{Decimal: col.Decimal()[rowIdx]})
case types.DateFamily:
m.row[outIdx].Datum = tree.NewDDate(pgdate.MakeCompatibleDateFromDisk(col.Int64()[rowIdx]))
case types.StringFamily:
b := col.Bytes()[rowIdx]
if ct.Oid() == oid.T_name {
m.row[outIdx].Datum = m.da.NewDString(tree.DString(*(*string)(unsafe.Pointer(&b))))
} else {
m.row[outIdx].Datum = m.da.NewDName(tree.DString(*(*string)(unsafe.Pointer(&b))))
}
case types.BytesFamily:
m.row[outIdx].Datum = m.da.NewDBytes(tree.DBytes(col.Bytes()[rowIdx]))
case types.OidFamily:
m.row[outIdx].Datum = m.da.NewDOid(tree.MakeDOid(tree.DInt(col.Int64()[rowIdx])))
default:
panic(fmt.Sprintf("Unsupported column type %s", ct.String()))
}

m.row[outIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, ct)
}
return m.ProcessRowHelper(m.row), nil
}
Expand Down
185 changes: 185 additions & 0 deletions pkg/sql/exec/builtin_funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package exec

import (
"context"
"fmt"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
semtypes "github.com/cockroachdb/cockroach/pkg/sql/types"
)

type indexResolver struct {
row tree.Datums
colTypes []semtypes.T
}

var _ tree.IndexedVarContainer = &indexResolver{}

func (i *indexResolver) IndexedVarEval(idx int, ctx *tree.EvalContext) (tree.Datum, error) {
return i.row[idx].Eval(ctx)
}

func (i *indexResolver) IndexedVarResolvedType(idx int) *semtypes.T {
return &i.colTypes[idx]
}

func (i *indexResolver) IndexedVarNodeFormatter(idx int) tree.NodeFormatter {
n := tree.Name(fmt.Sprintf("$%d", idx))
return &n
}

// defaultBuiltinOperator wraps around a builtin function and
// uses it to compute the function one row at a time. This allows
// the vectorized engine to execute all builtin functions that
// distsql can do. However, it repeatedly encodes and decodes
// rows to and from datums, so it should be swapped out with an
// efficient (vectorized) implementation for builtins that are
// important for performance.
type defaultBuiltinOperator struct {
input Operator

evalCtx *tree.EvalContext
funcExpr *tree.FuncExpr
outputIdx int
colTypes []semtypes.T

da sqlbase.DatumAlloc
row tree.Datums

resolved *indexResolver
}

func (d *defaultBuiltinOperator) Init() {
d.input.Init()
}

func (d *defaultBuiltinOperator) Next(ctx context.Context) coldata.Batch {
batch := d.input.Next(ctx)
n := batch.Length()

if n == 0 {
return batch
}

outputType := d.funcExpr.ResolvedType()
outputPhysType := conv.FromColumnType(outputType)

if d.outputIdx == batch.Width() {
batch.AppendCol(outputPhysType)
}

converter := conv.GetDatumToPhysicalFn(outputType)

writer := func(batch coldata.Batch, datum tree.Datum, i int) {
if datum == tree.DNull {
batch.ColVec(d.outputIdx).Nulls().SetNull(uint16(i))
return
}
converted, err := converter(datum)
if err != nil {
panic(err)
}
switch outputPhysType {
case types.Bool:
batch.ColVec(d.outputIdx).Bool()[i] = converted.(bool)
case types.Bytes:
batch.ColVec(d.outputIdx).Bytes()[i] = converted.([]byte)
case types.Int8:
batch.ColVec(d.outputIdx).Int8()[i] = converted.(int8)
case types.Int16:
batch.ColVec(d.outputIdx).Int16()[i] = converted.(int16)
case types.Int32:
batch.ColVec(d.outputIdx).Int32()[i] = converted.(int32)
case types.Int64:
batch.ColVec(d.outputIdx).Int64()[i] = converted.(int64)
case types.Float32:
batch.ColVec(d.outputIdx).Float32()[i] = converted.(float32)
case types.Float64:
batch.ColVec(d.outputIdx).Float64()[i] = converted.(float64)
case types.Decimal:
batch.ColVec(d.outputIdx).Decimal()[i] = converted.(apd.Decimal)
default:
panic(fmt.Sprintf("unhandled type %s", outputPhysType))
}
}

if sel := batch.Selection(); sel != nil {
sel = sel[:n]
for _, i := range sel {
for j := 0; j < batch.Width(); j++ {
col := batch.ColVec(j)
if col.MaybeHasNulls() && col.Nulls().NullAt(i) {
d.row[j] = tree.DNull
} else {
d.row[j] = PhysicalTypeColElemToDatum(col, i, d.da, d.colTypes[j])
}
}

d.resolved.row = d.row
d.evalCtx.PushIVarContainer(d.resolved)
res, err := d.funcExpr.Eval(d.evalCtx)
if err != nil {
panic(err)
}
d.evalCtx.PopIVarContainer()
writer(batch, res, int(i))
}
} else {
for i := uint16(0); i < n; i++ {
for j := 0; j < batch.Width(); j++ {
col := batch.ColVec(j)
if col.MaybeHasNulls() && col.Nulls().NullAt(i) {
d.row[j] = tree.DNull
} else {
d.row[j] = PhysicalTypeColElemToDatum(col, i, d.da, d.colTypes[j])
}
}

d.evalCtx.PushIVarContainer(&indexResolver{row: d.row, colTypes: d.colTypes})
res, err := d.funcExpr.Eval(d.evalCtx)
if err != nil {
panic(err)
}
d.evalCtx.PopIVarContainer()
writer(batch, res, int(i))
}
}
return batch
}

// NewBuiltinFunctionOperator returns an operator that applies builtin functions.
func NewBuiltinFunctionOperator(
tctx *tree.EvalContext,
columnTypes []semtypes.T,
input Operator,
funcExpr *tree.FuncExpr,
outputIdx int,
) Operator {

// For now, return the default builtin operator. Future work can specialize
// out the operators to efficient implementations of specific builtins.
return &defaultBuiltinOperator{
input: input,
evalCtx: tctx,
funcExpr: funcExpr,
outputIdx: outputIdx,
colTypes: columnTypes,
row: make(tree.Datums, len(columnTypes)),
resolved: &indexResolver{colTypes: columnTypes},
}
}
101 changes: 101 additions & 0 deletions pkg/sql/exec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package exec

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// Mock typing context for the typechecker.
type mockTypeContext struct {
typs []types.T
}

func (p *mockTypeContext) IndexedVarEval(idx int, ctx *tree.EvalContext) (tree.Datum, error) {
return tree.DNull.Eval(ctx)
}

func (p *mockTypeContext) IndexedVarResolvedType(idx int) *types.T {
return &p.typs[idx]
}

func (p *mockTypeContext) IndexedVarNodeFormatter(idx int) tree.NodeFormatter {
n := tree.Name(fmt.Sprintf("$%d", idx))
return &n
}

func TestBasicBuiltinFunctions(t *testing.T) {
// Trick to get the init() for the builtins package to run.
_ = builtins.AllBuiltinNames

testCases := []struct {
desc string
expr string
inputTuples tuples
inputTypes []types.T
outputTypes []types.T
outputTuples tuples
}{
{
desc: "Substring test",
expr: "substring(@1, 1, 2)",
inputTuples: tuples{{"Hello"}, {"There"}},
inputTypes: []types.T{*types.String},
outputTuples: tuples{{"He"}, {"Th"}},
outputTypes: []types.T{*types.String, *types.String},
},
{
desc: "Absolute value test",
expr: "abs(@1)",
inputTuples: tuples{{1}, {-1}},
inputTypes: []types.T{*types.Int},
outputTuples: tuples{{1}, {1}},
outputTypes: []types.T{*types.Int, *types.Int},
},
{
desc: "String length test",
expr: "length(@1)",
inputTuples: tuples{{"Hello"}, {"The"}},
inputTypes: []types.T{*types.String},
outputTuples: tuples{{5}, {3}},
outputTypes: []types.T{*types.String, *types.Int},
},
}

tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
runTests(t, []tuples{tc.inputTuples}, tc.outputTuples, orderedVerifier, []int{1},
func(input []Operator) (Operator, error) {
expr, err := parser.ParseExpr(tc.expr)
if err != nil {
t.Fatal(err)
}

p := &mockTypeContext{typs: tc.inputTypes}
typedExpr, err := tree.TypeCheck(expr, &tree.SemaContext{IVarContainer: p}, types.Any)
if err != nil {
t.Fatal(err)
}

return NewBuiltinFunctionOperator(tctx, tc.outputTypes, input[0], typedExpr.(*tree.FuncExpr), 1), nil
})
})
}
}
Loading

0 comments on commit e3ab7ce

Please sign in to comment.