forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement evaluator for distributed sql
Implementation of the `Evaluator` processor core type, a fully programmable no-grouping aggregator that runs a 'program' on each individual row. The 'program' is a set of expressions evaluated in order, the results of evaluating each of these expressions on the input row form the output. Part of cockroachdb#7587.
- Loading branch information
1 parent
fd48564
commit 6deb605
Showing
8 changed files
with
657 additions
and
61 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
// Copyright 2016 The Cockroach Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
// implied. See the License for the specific language governing | ||
// permissions and limitations under the License. | ||
// | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/sql/parser" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/tracing" | ||
"golang.org/x/net/context" | ||
) | ||
|
||
type evaluator struct { | ||
input RowSource | ||
output RowReceiver | ||
ctx context.Context | ||
exprs []exprHelper | ||
render []parser.TypedExpr | ||
|
||
// Buffer to store intermediate results when evaluating expressions per row | ||
// to avoid reallocation. | ||
tuple parser.DTuple | ||
rowAlloc sqlbase.EncDatumRowAlloc | ||
} | ||
|
||
func newEvaluator( | ||
flowCtx *FlowCtx, spec *EvaluatorSpec, input RowSource, output RowReceiver, | ||
) (*evaluator, error) { | ||
ev := &evaluator{ | ||
input: input, | ||
output: output, | ||
ctx: log.WithLogTag(flowCtx.Context, "Evaluator", nil), | ||
exprs: make([]exprHelper, len(spec.Exprs)), | ||
render: make([]parser.TypedExpr, len(spec.Exprs)), | ||
tuple: make(parser.DTuple, len(spec.Exprs)), | ||
} | ||
|
||
for i, expr := range spec.Exprs { | ||
err := ev.exprs[i].init(expr, spec.Types, flowCtx.evalCtx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// Loop over the expressions in our expression set and extract out fully | ||
// typed expressions, this will later be evaluated for each input row to | ||
// construct our output row. | ||
for i := range ev.exprs { | ||
typedExpr, err := (&ev.exprs[i]).expr.TypeCheck(nil, parser.NoTypePreference) | ||
if err != nil { | ||
return nil, err | ||
} | ||
ev.render[i] = typedExpr | ||
} | ||
|
||
return ev, nil | ||
} | ||
|
||
// Run is part of the processor interface. | ||
func (ev *evaluator) Run(wg *sync.WaitGroup) { | ||
if wg != nil { | ||
defer wg.Done() | ||
} | ||
|
||
ctx, span := tracing.ChildSpan(ev.ctx, "evaluator") | ||
defer tracing.FinishSpan(span) | ||
|
||
if log.V(2) { | ||
log.Infof(ctx, "starting evaluator process") | ||
defer log.Infof(ctx, "exiting evaluator") | ||
} | ||
|
||
for { | ||
row, err := ev.input.NextRow() | ||
if err != nil || row == nil { | ||
ev.output.Close(err) | ||
return | ||
} | ||
|
||
outRow, err := ev.eval(row) | ||
if err != nil { | ||
ev.output.Close(err) | ||
return | ||
} | ||
|
||
if log.V(3) { | ||
log.Infof(ctx, "pushing %s\n", outRow) | ||
} | ||
// Push the row to the output RowReceiver; stop if they don't need more | ||
// rows. | ||
if !ev.output.PushRow(outRow) { | ||
if log.V(2) { | ||
log.Infof(ctx, "no more rows required") | ||
} | ||
ev.output.Close(nil) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (ev *evaluator) eval(row sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) { | ||
for i := range ev.exprs { | ||
datum, err := (&ev.exprs[i]).eval(row) | ||
if err != nil { | ||
return nil, err | ||
} | ||
ev.tuple[i] = datum | ||
} | ||
|
||
outRow := ev.rowAlloc.AllocRow(len(ev.tuple)) | ||
for i, datum := range ev.tuple { | ||
encDatum, err := sqlbase.DatumToEncDatum(datum) | ||
if err != nil { | ||
return nil, err | ||
} | ||
outRow[i] = encDatum | ||
} | ||
return outRow, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
// Copyright 2016 The Cockroach Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
// implied. See the License for the specific language governing | ||
// permissions and limitations under the License. | ||
// | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/sql/parser" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
func TestEvaluator(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
|
||
v := [15]sqlbase.EncDatum{} | ||
for i := range v { | ||
v[i].SetDatum(sqlbase.ColumnType_INT, parser.NewDInt(parser.DInt(i))) | ||
} | ||
|
||
dTrue, _ := parser.ParseDBool("true") | ||
dFalse, _ := parser.ParseDBool("false") | ||
|
||
b := [2]sqlbase.EncDatum{} | ||
b[0].SetDatum(sqlbase.ColumnType_BOOL, dTrue) | ||
b[1].SetDatum(sqlbase.ColumnType_BOOL, dFalse) | ||
|
||
testCases := []struct { | ||
spec EvaluatorSpec | ||
input sqlbase.EncDatumRows | ||
expected sqlbase.EncDatumRows | ||
}{ | ||
{ | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{{Expr: "$1"}, {Expr: "((($0)))"}}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[1], v[2]}, | ||
{v[3], v[4]}, | ||
{v[6], v[2]}, | ||
{v[7], v[2]}, | ||
{v[8], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[2], v[1]}, | ||
{v[4], v[3]}, | ||
{v[2], v[6]}, | ||
{v[2], v[7]}, | ||
{v[4], v[8]}, | ||
}, | ||
}, { | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{ | ||
{Expr: "$0 + $1"}, | ||
{Expr: "$0 - $1"}, | ||
{Expr: "$0 >= 8"}, | ||
}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[10], v[0]}, | ||
{v[9], v[1]}, | ||
{v[8], v[2]}, | ||
{v[7], v[3]}, | ||
{v[6], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[10], v[10], b[0]}, | ||
{v[10], v[8], b[0]}, | ||
{v[10], v[6], b[0]}, | ||
{v[10], v[4], b[1]}, | ||
{v[10], v[2], b[1]}, | ||
}, | ||
}, { | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_BOOL, sqlbase.ColumnType_BOOL}, | ||
Exprs: []Expression{ | ||
{Expr: "$0 AND $0"}, | ||
{Expr: "$0 AND $1"}, | ||
{Expr: "NOT $0"}, | ||
}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{b[0], b[1]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{b[0], b[1], b[1]}, | ||
}, | ||
}, | ||
{ | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{{Expr: "1"}}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[1], v[2]}, | ||
{v[3], v[4]}, | ||
{v[6], v[2]}, | ||
{v[7], v[2]}, | ||
{v[8], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, c := range testCases { | ||
es := c.spec | ||
|
||
in := &RowBuffer{rows: c.input} | ||
out := &RowBuffer{} | ||
|
||
flowCtx := FlowCtx{ | ||
Context: context.Background(), | ||
evalCtx: &parser.EvalContext{}, | ||
} | ||
|
||
ev, err := newEvaluator(&flowCtx, &es, in, out) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ev.Run(nil) | ||
if out.err != nil { | ||
t.Fatal(out.err) | ||
} | ||
if !out.closed { | ||
t.Fatalf("output RowReceiver not closed") | ||
} | ||
|
||
if result := out.rows.String(); result != c.expected.String() { | ||
t.Errorf("invalid results: %s, expected %s'", result, c.expected.String()) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// permissions and limitations under the License. | ||
// | ||
// Author: Radu Berinde ([email protected]) | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
|
@@ -27,8 +28,8 @@ import ( | |
"github.com/pkg/errors" | ||
) | ||
|
||
// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.) to | ||
// IndexedVars. | ||
// valArgsConvert is a parser.Visitor that converts Placeholders ($0, $1, etc.) | ||
// to IndexedVars. | ||
type valArgsConvert struct { | ||
h *parser.IndexedVarHelper | ||
err error | ||
|
@@ -82,8 +83,8 @@ type exprHelper struct { | |
noCopy util.NoCopy | ||
|
||
expr parser.TypedExpr | ||
// vars is used to generate IndexedVars that are "backed" by | ||
// the values in `row`. | ||
// vars is used to generate IndexedVars that are "backed" by the values in | ||
// `row`. | ||
vars parser.IndexedVarHelper | ||
|
||
evalCtx *parser.EvalContext | ||
|
@@ -142,3 +143,31 @@ func (eh *exprHelper) evalFilter(row sqlbase.EncDatumRow) (bool, error) { | |
eh.row = row | ||
return sqlbase.RunFilter(eh.expr, eh.evalCtx) | ||
} | ||
|
||
// Given a row, eval evaluates the wrapped expression and returns the | ||
// resulting datum needed for rendering for eg. given a row (1, 2, 3, 4, 5): | ||
// '$1' would return '2' | ||
// '$1 + $4' would return '7' | ||
// '$0' would return '1' | ||
// '$1 + 10' would return '12' | ||
func (eh *exprHelper) eval(row sqlbase.EncDatumRow) (parser.Datum, error) { | ||
eh.row = row | ||
|
||
// TODO(irfansharif): eval here is very permissive, if expr is of type | ||
// *parser.FuncExpr for example expr.Eval doesn't make sense therefore is | ||
// explicitly tested for. There may very well be other expression types | ||
// where the same holds true but are not yet checked for. The set of | ||
// verified parser expressions are: | ||
// ComparisonExpr, FuncExpr, AndExpr, BinaryExpr, NotExpr, OrExpr, | ||
// ParenExpr, UnaryExpr. | ||
// | ||
// The list of unverified parser expressions are: | ||
// IsOfTypeExpr, AnnotateTypeExpr, CaseExpr, CastExpr, CoalesceExpr, | ||
// ExistsExpr, IfExpr, NullIfExpr. | ||
switch eh.expr.(type) { | ||
case *parser.FuncExpr: | ||
return nil, errors.Errorf("aggregate functions not allowed") | ||
default: | ||
return eh.expr.Eval(eh.evalCtx) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// permissions and limitations under the License. | ||
// | ||
// Author: Radu Berinde ([email protected]) | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
|
@@ -207,6 +208,12 @@ func (f *Flow) makeProcessor(ps *ProcessorSpec, inputs []RowSource) (processor, | |
} | ||
return newSorter(&f.FlowCtx, ps.Core.Sorter, inputs[0], outputs[0]), nil | ||
} | ||
if ps.Core.Evaluator != nil { | ||
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { | ||
return nil, err | ||
} | ||
return newEvaluator(&f.FlowCtx, ps.Core.Evaluator, inputs[0], outputs[0]) | ||
} | ||
return nil, errors.Errorf("unsupported processor %s", ps) | ||
} | ||
|
||
|
Oops, something went wrong.