forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement evaluator for distributed sql
Implementation of the `Evaluator` processor core type, a fully programmable no-grouping aggregator that runs a 'program' on each individual row. The 'program' is a set of expressions evaluated in order, the results of evaluating each of these expressions on the input row form the output. Part of cockroachdb#7587.
- Loading branch information
1 parent
d6a1011
commit 2853881
Showing
8 changed files
with
651 additions
and
57 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
// Copyright 2016 The Cockroach Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
// implied. See the License for the specific language governing | ||
// permissions and limitations under the License. | ||
// | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/cockroachdb/cockroach/sql/parser" | ||
"github.com/cockroachdb/cockroach/sql/sqlbase" | ||
"github.com/cockroachdb/cockroach/util/log" | ||
"golang.org/x/net/context" | ||
) | ||
|
||
type evaluator struct { | ||
input RowSource | ||
output RowReceiver | ||
ctx context.Context | ||
evalCtx *parser.EvalContext | ||
exprs []exprHelper | ||
render []parser.TypedExpr | ||
} | ||
|
||
func newEvaluator( | ||
flowCtx *FlowCtx, spec *EvaluatorSpec, input RowSource, output RowReceiver, | ||
) (*evaluator, error) { | ||
ev := &evaluator{ | ||
input: input, | ||
output: output, | ||
ctx: log.WithLogTag(flowCtx.Context, "Evaluator", nil), | ||
evalCtx: flowCtx.evalCtx, | ||
exprs: make([]exprHelper, len(spec.Exprs)), | ||
render: make([]parser.TypedExpr, len(spec.Exprs)), | ||
} | ||
|
||
for i, expr := range spec.Exprs { | ||
err := ev.exprs[i].init(expr, spec.Types, ev.evalCtx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// Loop over the expressions in our expression set and extract out fully typed expressions, this | ||
// will later be evaluated for each input row to construct our output row. | ||
for i := range ev.exprs { | ||
typedExpr, err := (&ev.exprs[i]).expr.TypeCheck(nil, parser.NoTypePreference) | ||
if err != nil { | ||
return nil, err | ||
} | ||
ev.render[i] = typedExpr | ||
} | ||
|
||
return ev, nil | ||
} | ||
|
||
// Run is part of the processor interface. | ||
func (ev *evaluator) Run(wg *sync.WaitGroup) { | ||
if wg != nil { | ||
defer wg.Done() | ||
} | ||
|
||
if log.V(2) { | ||
log.Infof(ev.ctx, "starting evaluator process") | ||
defer log.Infof(ev.ctx, "exiting evaluator") | ||
} | ||
|
||
for { | ||
row, err := ev.next() | ||
if err != nil || row == nil { | ||
ev.output.Close(err) | ||
return | ||
} | ||
if log.V(3) { | ||
log.Infof(ev.ctx, "pushing %s\n", row) | ||
} | ||
// Push the row to the output RowReceiver; stop if they don't need more rows. | ||
if !ev.output.PushRow(row) { | ||
if log.V(2) { | ||
log.Infof(ev.ctx, "no more rows required") | ||
} | ||
ev.output.Close(nil) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (ev *evaluator) next() (sqlbase.EncDatumRow, error) { | ||
row, err := ev.input.NextRow() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if row == nil { | ||
return nil, nil | ||
} | ||
return ev.extract(row) | ||
} | ||
|
||
func (ev *evaluator) extract(row sqlbase.EncDatumRow) (sqlbase.EncDatumRow, error) { | ||
tuple := make(parser.DTuple, len(ev.exprs)) | ||
for i := range ev.exprs { | ||
datum, err := (&ev.exprs[i]).extract(row) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tuple[i] = datum | ||
} | ||
return sqlbase.DTupleToEncDatumRow(tuple) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
// Copyright 2016 The Cockroach Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
// implied. See the License for the specific language governing | ||
// permissions and limitations under the License. | ||
// | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/sql/parser" | ||
"github.com/cockroachdb/cockroach/sql/sqlbase" | ||
"github.com/cockroachdb/cockroach/util/leaktest" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
func TestEvaluator(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
|
||
v := [15]sqlbase.EncDatum{} | ||
for i := range v { | ||
v[i].SetDatum(sqlbase.ColumnType_INT, parser.NewDInt(parser.DInt(i))) | ||
} | ||
|
||
dTrue, _ := parser.ParseDBool("true") | ||
dFalse, _ := parser.ParseDBool("false") | ||
|
||
b := [2]sqlbase.EncDatum{} | ||
b[0].SetDatum(sqlbase.ColumnType_BOOL, dTrue) | ||
b[1].SetDatum(sqlbase.ColumnType_BOOL, dFalse) | ||
|
||
testCases := []struct { | ||
spec EvaluatorSpec | ||
input sqlbase.EncDatumRows | ||
expected sqlbase.EncDatumRows | ||
}{ | ||
{ | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{Expression{Expr: "$1"}, Expression{Expr: "((($0)))"}}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[1], v[2]}, | ||
{v[3], v[4]}, | ||
{v[6], v[2]}, | ||
{v[7], v[2]}, | ||
{v[8], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[2], v[1]}, | ||
{v[4], v[3]}, | ||
{v[2], v[6]}, | ||
{v[2], v[7]}, | ||
{v[4], v[8]}, | ||
}, | ||
}, { | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{ | ||
Expression{Expr: "$0 + $1"}, | ||
Expression{Expr: "$0 - $1"}, | ||
Expression{Expr: "$0 >= 8"}, | ||
}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[10], v[0]}, | ||
{v[9], v[1]}, | ||
{v[8], v[2]}, | ||
{v[7], v[3]}, | ||
{v[6], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[10], v[10], b[0]}, | ||
{v[10], v[8], b[0]}, | ||
{v[10], v[6], b[0]}, | ||
{v[10], v[4], b[1]}, | ||
{v[10], v[2], b[1]}, | ||
}, | ||
}, { | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_BOOL, sqlbase.ColumnType_BOOL}, | ||
Exprs: []Expression{ | ||
Expression{Expr: "$0 AND $0"}, | ||
Expression{Expr: "$0 AND $1"}, | ||
Expression{Expr: "NOT $0"}, | ||
}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{b[0], b[1]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{b[0], b[1], b[1]}, | ||
}, | ||
}, | ||
{ | ||
spec: EvaluatorSpec{ | ||
Types: []sqlbase.ColumnType_Kind{sqlbase.ColumnType_INT, sqlbase.ColumnType_INT}, | ||
Exprs: []Expression{Expression{Expr: "1"}}, | ||
}, | ||
input: sqlbase.EncDatumRows{ | ||
{v[1], v[2]}, | ||
{v[3], v[4]}, | ||
{v[6], v[2]}, | ||
{v[7], v[2]}, | ||
{v[8], v[4]}, | ||
}, | ||
expected: sqlbase.EncDatumRows{ | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
{v[1]}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, c := range testCases { | ||
es := c.spec | ||
|
||
in := &RowBuffer{rows: c.input} | ||
out := &RowBuffer{} | ||
|
||
flowCtx := FlowCtx{ | ||
Context: context.Background(), | ||
evalCtx: &parser.EvalContext{}, | ||
} | ||
|
||
ev, err := newEvaluator(&flowCtx, &es, in, out) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ev.Run(nil) | ||
if out.err != nil { | ||
t.Fatal(out.err) | ||
} | ||
if !out.closed { | ||
t.Fatalf("output RowReceiver not closed") | ||
} | ||
|
||
if result := out.rows.String(); result != c.expected.String() { | ||
t.Errorf("invalid results: %s, expected %s'", result, c.expected.String()) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// permissions and limitations under the License. | ||
// | ||
// Author: Radu Berinde ([email protected]) | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
|
@@ -142,3 +143,33 @@ func (eh *exprHelper) evalFilter(row sqlbase.EncDatumRow) (bool, error) { | |
eh.row = row | ||
return sqlbase.RunFilter(eh.expr, eh.evalCtx) | ||
} | ||
|
||
// Given a row, extract evaluates the wrapped expression and returns the resulting datum needed for | ||
// rendering. For example given a row (1, 2, 3, 4, 5): | ||
// '$1' would return '2' | ||
// '$1 + $4' would return '7' | ||
// '$0' would return '1' | ||
// '$1 + 10' would return '12' | ||
func (eh *exprHelper) extract(row sqlbase.EncDatumRow) (parser.Datum, error) { | ||
eh.row = row | ||
|
||
// TODO(irfansharif): extract here is very permissive, if expr is of type *parser.FuncExpr for | ||
// example expr.Eval doesn't make sense therefore is explicitly tested for. There may very well be | ||
// other expression types where the same holds true but are not yet checked for. | ||
// The set of verified parser expressions are: | ||
// ComparisonExpr, FuncExpr, AndExpr, BinaryExpr, NotExpr, OrExpr, ParenExpr, UnaryExpr. | ||
// | ||
// The list of unverified parser expressions are: | ||
// IsOfTypeExpr, AnnotateTypeExpr, CaseExpr, CastExpr, CoalesceExpr, ExistsExpr, IfExpr, | ||
// NullIfExpr. | ||
// | ||
// TODO(irfansharif): We need to determine when exactly the 'argument extraction' takes place, | ||
// i.e. if the original SQL expression contains 'SUM($1 + $2)', we would on some level need to | ||
// extract the sub expression '$1 + $2'. | ||
switch eh.expr.(type) { | ||
case *parser.FuncExpr: | ||
return nil, errors.Errorf("aggregate functions not allowed") | ||
default: | ||
return eh.expr.Eval(eh.evalCtx) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// permissions and limitations under the License. | ||
// | ||
// Author: Radu Berinde ([email protected]) | ||
// Author: Irfan Sharif ([email protected]) | ||
|
||
package distsql | ||
|
||
|
@@ -207,6 +208,12 @@ func (f *Flow) makeProcessor(ps *ProcessorSpec, inputs []RowSource) (processor, | |
} | ||
return newSorter(&f.FlowCtx, ps.Core.Sorter, inputs[0], outputs[0]), nil | ||
} | ||
if ps.Core.Evaluator != nil { | ||
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { | ||
return nil, err | ||
} | ||
return newEvaluator(&f.FlowCtx, ps.Core.Evaluator, inputs[0], outputs[0]) | ||
} | ||
return nil, errors.Errorf("unsupported processor %s", ps) | ||
} | ||
|
||
|
Oops, something went wrong.