Skip to content

Commit

Permalink
implement evaluator for distributed sql
Browse files Browse the repository at this point in the history
Implementation of the `Evaluator` processor core type, a fully
programmable no-grouping aggregator that runs a 'program' on each
individual row. The 'program' is a set of expressions evaluated in
order, the results of evaluating each of these expressions on the input
row form the output.

Part of #7587.
  • Loading branch information
irfansharif committed Oct 12, 2016
1 parent d6a1011 commit 20d2465
Show file tree
Hide file tree
Showing 8 changed files with 655 additions and 61 deletions.
1 change: 1 addition & 0 deletions sql/distsql/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions sql/distsql/evaluator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])

package distsql

import (
"sync"

"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/tracing"
"golang.org/x/net/context"
)

type evaluator struct {
input RowSource
output RowReceiver
ctx context.Context
exprs []exprHelper
render []parser.TypedExpr

// Buffer to store intermediate results when evaluating expressions per row
// to avoid reallocation.
tuple parser.DTuple
rowAlloc sqlbase.EncDatumRowAlloc
}

func newEvaluator(
flowCtx *FlowCtx, spec *EvaluatorSpec, input RowSource, output RowReceiver,
) (*evaluator, error) {
ev := &evaluator{
input: input,
output: output,
ctx: log.WithLogTag(flowCtx.Context, "Evaluator", nil),
exprs: make([]exprHelper, len(spec.Exprs)),
render: make([]parser.TypedExpr, len(spec.Exprs)),
tuple: make(parser.DTuple, len(spec.Exprs)),
}

for i, expr := range spec.Exprs {
err := ev.exprs[i].init(expr, spec.Types, flowCtx.evalCtx)
if err != nil {
return nil, err
}
}

// Loop over the expressions in our expression set and extract out fully
// typed expressions, this will later be evaluated for each input row to
// construct our output row.
for i := range ev.exprs {
typedExpr, err := (&ev.exprs[i]).expr.TypeCheck(nil, parser.NoTypePreference)
if err != nil {
return nil, err
}
ev.render[i] = typedExpr
}

return ev, nil
}

// Run is part of the processor interface.
func (ev *evaluator) Run(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}

ctx, span := tracing.ChildSpan(ev.ctx, "sorter")
defer tracing.FinishSpan(span)

if log.V(2) {
log.Infof(ctx, "starting evaluator process")
defer log.Infof(ctx, "exiting evaluator")
}

for {
row, err := ev.input.NextRow()
if err != nil || row == nil {
ev.output.Close(err)
return
}

outRow, err := ev.eval(row)
if err != nil {
ev.output.Close(err)
return
}

if log.V(3) {
log.Infof(ctx, "pushing %s\n", outRow)
}
// Push the row to the output RowReceiver; stop if they don't need more
// rows.
if !ev.output.PushRow(outRow) {
if log.V(2) {
log.Infof(ctx, "no more rows required")
}
ev.output.Close(nil)
return
}
}
}

func (ev *evaluator) eval(row sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) {
for i := range ev.exprs {
datum, err := (&ev.exprs[i]).eval(row)
if err != nil {
return nil, err
}
ev.tuple[i] = datum
}

outRow := ev.rowAlloc.AllocRow(len(ev.tuple))
for i, datum := range ev.tuple {
encDatum, err := sqlbase.DatumToEncDatum(datum)
if err != nil {
return nil, err
}
outRow[i] = encDatum
}
return outRow, nil
}
157 changes: 157 additions & 0 deletions sql/distsql/evaluator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])

package distsql

import (
"testing"

"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util/leaktest"

"golang.org/x/net/context"
)

func TestEvaluator(t *testing.T) {
defer leaktest.AfterTest(t)()

v := [15]sqlbase.EncDatum{}
for i := range v {
v[i].SetDatum(sqlbase.ColumnType_INT, parser.NewDInt(parser.DInt(i)))
}

dTrue, _ := parser.ParseDBool("true")
dFalse, _ := parser.ParseDBool("false")

b := [2]sqlbase.EncDatum{}
b[0].SetDatum(sqlbase.ColumnType_BOOL, dTrue)
b[1].SetDatum(sqlbase.ColumnType_BOOL, dFalse)

testCases := []struct {
spec EvaluatorSpec
input sqlbase.EncDatumRows
expected sqlbase.EncDatumRows
}{
{
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{Expression{Expr: "$1"}, Expression{Expr: "((($0)))"}},
},
input: sqlbase.EncDatumRows{
{v[1], v[2]},
{v[3], v[4]},
{v[6], v[2]},
{v[7], v[2]},
{v[8], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[2], v[1]},
{v[4], v[3]},
{v[2], v[6]},
{v[2], v[7]},
{v[4], v[8]},
},
}, {
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{
Expression{Expr: "$0 + $1"},
Expression{Expr: "$0 - $1"},
Expression{Expr: "$0 >= 8"},
},
},
input: sqlbase.EncDatumRows{
{v[10], v[0]},
{v[9], v[1]},
{v[8], v[2]},
{v[7], v[3]},
{v[6], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[10], v[10], b[0]},
{v[10], v[8], b[0]},
{v[10], v[6], b[0]},
{v[10], v[4], b[1]},
{v[10], v[2], b[1]},
},
}, {
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_BOOL, sqlbase.ColumnType_BOOL},
Exprs: []Expression{
Expression{Expr: "$0 AND $0"},
Expression{Expr: "$0 AND $1"},
Expression{Expr: "NOT $0"},
},
},
input: sqlbase.EncDatumRows{
{b[0], b[1]},
},
expected: sqlbase.EncDatumRows{
{b[0], b[1], b[1]},
},
},
{
spec: EvaluatorSpec{
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT},
Exprs: []Expression{Expression{Expr: "1"}},
},
input: sqlbase.EncDatumRows{
{v[1], v[2]},
{v[3], v[4]},
{v[6], v[2]},
{v[7], v[2]},
{v[8], v[4]},
},
expected: sqlbase.EncDatumRows{
{v[1]},
{v[1]},
{v[1]},
{v[1]},
{v[1]},
},
},
}

for _, c := range testCases {
es := c.spec

in := &RowBuffer{rows: c.input}
out := &RowBuffer{}

flowCtx := FlowCtx{
Context: context.Background(),
evalCtx: &parser.EvalContext{},
}

ev, err := newEvaluator(&flowCtx, &es, in, out)
if err != nil {
t.Fatal(err)
}

ev.Run(nil)
if out.err != nil {
t.Fatal(out.err)
}
if !out.closed {
t.Fatalf("output RowReceiver not closed")
}

if result := out.rows.String(); result != c.expected.String() {
t.Errorf("invalid results: %s, expected %s'", result, c.expected.String())
}
}
}
37 changes: 33 additions & 4 deletions sql/distsql/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// permissions and limitations under the License.
//
// Author: Radu Berinde ([email protected])
// Author: Irfan Sharif ([email protected])

package distsql

Expand All @@ -27,8 +28,8 @@ import (
"github.com/pkg/errors"
)

// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.) to
// IndexedVars.
// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.)
// to IndexedVars.
type valArgsConvert struct {
h *parser.IndexedVarHelper
err error
Expand Down Expand Up @@ -82,8 +83,8 @@ type exprHelper struct {
noCopy util.NoCopy

expr parser.TypedExpr
// vars is used to generate IndexedVars that are "backed" by
// the values in `row`.
// vars is used to generate IndexedVars that are "backed" by the values in
// `row`.
vars parser.IndexedVarHelper

evalCtx *parser.EvalContext
Expand Down Expand Up @@ -142,3 +143,31 @@ func (eh *exprHelper) evalFilter(row sqlbase.EncDatumRow) (bool, error) {
eh.row = row
return sqlbase.RunFilter(eh.expr, eh.evalCtx)
}

// Given a row, eval evaluates the wrapped expression and returns the
// resulting datum needed for rendering for eg. given a row (1, 2, 3, 4, 5):
// '$1' would return '2'
// '$1 + $4' would return '7'
// '$0' would return '1'
// '$1 + 10' would return '12'
func (eh *exprHelper) eval(row sqlbase.EncDatumRow) (parser.Datum, error) {
eh.row = row

// TODO(irfansharif): eval here is very permissive, if expr is of type
// *parser.FuncExpr for example expr.Eval doesn't make sense therefore is
// explicitly tested for. There may very well be other expression types
// where the same holds true but are not yet checked for. The set of
// verified parser expressions are:
// ComparisonExpr, FuncExpr, AndExpr, BinaryExpr, NotExpr, OrExpr,
// ParenExpr, UnaryExpr.
//
// The list of unverified parser expressions are:
// IsOfTypeExpr, AnnotateTypeExpr, CaseExpr, CastExpr, CoalesceExpr,
// ExistsExpr, IfExpr, NullIfExpr.
switch eh.expr.(type) {
case *parser.FuncExpr:
return nil, errors.Errorf("aggregate functions not allowed")
default:
return eh.expr.Eval(eh.evalCtx)
}
}
7 changes: 7 additions & 0 deletions sql/distsql/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// permissions and limitations under the License.
//
// Author: Radu Berinde ([email protected])
// Author: Irfan Sharif ([email protected])

package distsql

Expand Down Expand Up @@ -207,6 +208,12 @@ func (f *Flow) makeProcessor(ps *ProcessorSpec, inputs []RowSource) (processor,
}
return newSorter(&f.FlowCtx, ps.Core.Sorter, inputs[0], outputs[0]), nil
}
if ps.Core.Evaluator != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newEvaluator(&f.FlowCtx, ps.Core.Evaluator, inputs[0], outputs[0])
}
return nil, errors.Errorf("unsupported processor %s", ps)
}

Expand Down
Loading

0 comments on commit 20d2465

Please sign in to comment.