Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: implement evaluator (processor core type) #9916

Merged
merged 1 commit into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/distsql/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions pkg/sql/distsql/evaluator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])

package distsql

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"golang.org/x/net/context"
)

type evaluator struct {
input RowSource
output RowReceiver
ctx context.Context
exprs []exprHelper
render []parser.TypedExpr

// Buffer to store intermediate results when evaluating expressions per row
// to avoid reallocation.
tuple parser.DTuple
rowAlloc sqlbase.EncDatumRowAlloc
}

func newEvaluator(
flowCtx *FlowCtx, spec *EvaluatorSpec, input RowSource, output RowReceiver,
) (*evaluator, error) {
ev := &evaluator{
input: input,
output: output,
ctx: log.WithLogTag(flowCtx.Context, "Evaluator", nil),
exprs: make([]exprHelper, len(spec.Exprs)),
render: make([]parser.TypedExpr, len(spec.Exprs)),
tuple: make(parser.DTuple, len(spec.Exprs)),
}

for i, expr := range spec.Exprs {
err := ev.exprs[i].init(expr, spec.Types, flowCtx.evalCtx)
if err != nil {
return nil, err
}
}

// Loop over the expressions in our expression set and extract out fully
// typed expressions, this will later be evaluated for each input row to
// construct our output row.
for i := range ev.exprs {
typedExpr, err := (&ev.exprs[i]).expr.TypeCheck(nil, parser.NoTypePreference)
if err != nil {
return nil, err
}
ev.render[i] = typedExpr
}

return ev, nil
}

// Run is part of the processor interface.
func (ev *evaluator) Run(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}

ctx, span := tracing.ChildSpan(ev.ctx, "evaluator")
defer tracing.FinishSpan(span)

if log.V(2) {
log.Infof(ctx, "starting evaluator process")
defer log.Infof(ctx, "exiting evaluator")
}

for {
row, err := ev.input.NextRow()
if err != nil || row == nil {
ev.output.Close(err)
return
}

outRow, err := ev.eval(row)
if err != nil {
ev.output.Close(err)
return
}

if log.V(3) {
log.Infof(ctx, "pushing %s\n", outRow)
}
// Push the row to the output RowReceiver; stop if they don't need more
// rows.
if !ev.output.PushRow(outRow) {
if log.V(2) {
log.Infof(ctx, "no more rows required")
}
ev.output.Close(nil)
return
}
}
}

func (ev *evaluator) eval(row sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) {
for i := range ev.exprs {
datum, err := (&ev.exprs[i]).eval(row)
if err != nil {
return nil, err
}
ev.tuple[i] = datum
}

outRow := ev.rowAlloc.AllocRow(len(ev.tuple))
for i, datum := range ev.tuple {
encDatum, err := sqlbase.DatumToEncDatum(datum)
if err != nil {
return nil, err
}
outRow[i] = encDatum
}
return outRow, nil
}
157 changes: 157 additions & 0 deletions pkg/sql/distsql/evaluator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])

package distsql

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"

"golang.org/x/net/context"
)

func TestEvaluator(t *testing.T) {
defer leaktest.AfterTest(t)()

v := [15]sqlbase.EncDatum{}
for i := range v {
v[i].SetDatum(sqlbase.ColumnType_INT, parser.NewDInt(parser.DInt(i)))
}

dTrue, _ := parser.ParseDBool("true")
dFalse, _ := parser.ParseDBool("false")

b := [2]sqlbase.EncDatum{}
b[0].SetDatum(sqlbase.ColumnType_BOOL, dTrue)
b[1].SetDatum(sqlbase.ColumnType_BOOL, dFalse)

testCases := []struct {
spec EvaluatorSpec
input sqlbase.EncDatumRows
expected sqlbase.EncDatumRows
}{
{
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{{Expr: "$1"}, {Expr: "((($0)))"}},
},
input: sqlbase.EncDatumRows{
{v[1], v[2]},
{v[3], v[4]},
{v[6], v[2]},
{v[7], v[2]},
{v[8], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[2], v[1]},
{v[4], v[3]},
{v[2], v[6]},
{v[2], v[7]},
{v[4], v[8]},
},
}, {
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{
{Expr: "$0 + $1"},
{Expr: "$0 - $1"},
{Expr: "$0 >= 8"},
},
},
input: sqlbase.EncDatumRows{
{v[10], v[0]},
{v[9], v[1]},
{v[8], v[2]},
{v[7], v[3]},
{v[6], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[10], v[10], b[0]},
{v[10], v[8], b[0]},
{v[10], v[6], b[0]},
{v[10], v[4], b[1]},
{v[10], v[2], b[1]},
},
}, {
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_BOOL, sqlbase.ColumnType_BOOL},
Exprs: []Expression{
{Expr: "$0 AND $0"},
{Expr: "$0 AND $1"},
{Expr: "NOT $0"},
},
},
input: sqlbase.EncDatumRows{
{b[0], b[1]},
},
expected: sqlbase.EncDatumRows{
{b[0], b[1], b[1]},
},
},
{
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{{Expr: "1"}},
},
input: sqlbase.EncDatumRows{
{v[1], v[2]},
{v[3], v[4]},
{v[6], v[2]},
{v[7], v[2]},
{v[8], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[1]},
{v[1]},
{v[1]},
{v[1]},
{v[1]},
},
},
}

for _, c := range testCases {
es := c.spec

in := &RowBuffer{rows: c.input}
out := &RowBuffer{}

flowCtx := FlowCtx{
Context: context.Background(),
evalCtx: &parser.EvalContext{},
}

ev, err := newEvaluator(&flowCtx, &es, in, out)
if err != nil {
t.Fatal(err)
}

ev.Run(nil)
if out.err != nil {
t.Fatal(out.err)
}
if !out.closed {
t.Fatalf("output RowReceiver not closed")
}

if result := out.rows.String(); result != c.expected.String() {
t.Errorf("invalid results: %s, expected %s'", result, c.expected.String())
}
}
}
37 changes: 33 additions & 4 deletions pkg/sql/distsql/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// permissions and limitations under the License.
//
// Author: Radu Berinde ([email protected])
// Author: Irfan Sharif ([email protected])

package distsql

Expand All @@ -27,8 +28,8 @@ import (
"github.com/pkg/errors"
)

// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.) to
// IndexedVars.
// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.)
// to IndexedVars.
type valArgsConvert struct {
h *parser.IndexedVarHelper
err error
Expand Down Expand Up @@ -82,8 +83,8 @@ type exprHelper struct {
noCopy util.NoCopy

expr parser.TypedExpr
// vars is used to generate IndexedVars that are "backed" by
// the values in `row`.
// vars is used to generate IndexedVars that are "backed" by the values in
// `row`.
vars parser.IndexedVarHelper

evalCtx *parser.EvalContext
Expand Down Expand Up @@ -142,3 +143,31 @@ func (eh *exprHelper) evalFilter(row sqlbase.EncDatumRow) (bool, error) {
eh.row = row
return sqlbase.RunFilter(eh.expr, eh.evalCtx)
}

// Given a row, eval evaluates the wrapped expression and returns the
// resulting datum needed for rendering for eg. given a row (1, 2, 3, 4, 5):
// '$1' would return '2'
// '$1 + $4' would return '7'
// '$0' would return '1'
// '$1 + 10' would return '12'
func (eh *exprHelper) eval(row sqlbase.EncDatumRow) (parser.Datum, error) {
eh.row = row

// TODO(irfansharif): eval here is very permissive, if expr is of type
// *parser.FuncExpr for example expr.Eval doesn't make sense therefore is
// explicitly tested for. There may very well be other expression types
// where the same holds true but are not yet checked for. The set of
// verified parser expressions are:
// ComparisonExpr, FuncExpr, AndExpr, BinaryExpr, NotExpr, OrExpr,
// ParenExpr, UnaryExpr.
//
// The list of unverified parser expressions are:
// IsOfTypeExpr, AnnotateTypeExpr, CaseExpr, CastExpr, CoalesceExpr,
// ExistsExpr, IfExpr, NullIfExpr.
switch eh.expr.(type) {
case *parser.FuncExpr:
return nil, errors.Errorf("aggregate functions not allowed")
default:
return eh.expr.Eval(eh.evalCtx)
}
}
7 changes: 7 additions & 0 deletions pkg/sql/distsql/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// permissions and limitations under the License.
//
// Author: Radu Berinde ([email protected])
// Author: Irfan Sharif ([email protected])

package distsql

Expand Down Expand Up @@ -207,6 +208,12 @@ func (f *Flow) makeProcessor(ps *ProcessorSpec, inputs []RowSource) (processor,
}
return newSorter(&f.FlowCtx, ps.Core.Sorter, inputs[0], outputs[0]), nil
}
if ps.Core.Evaluator != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newEvaluator(&f.FlowCtx, ps.Core.Evaluator, inputs[0], outputs[0])
}
return nil, errors.Errorf("unsupported processor %s", ps)
}

Expand Down
Loading