Skip to content

Commit

Permalink
physicalplan: always store LocalExpr
Browse files Browse the repository at this point in the history
Previously, we would set either `LocalExpr` (unserialized expression,
only we have the full plan on a single node) or `Expr` (serialized
expression, when we have distributed plan as well as in some tests).
However, we could be setting both and making best effort to reuse
unserialized `LocalExpr` on the gateway even if the plan is distributed.
And this commit adds such behavior.

Release note: None
  • Loading branch information
yuzefovich committed Jun 10, 2020
1 parent 5fab1ad commit fc09ef9
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ func (r *postProcessResult) planFilterExpr(
ctx, evalCtx, expr, r.ColumnTypes, r.Op, acc, factory,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize filter expression %q", filter.Expr)
return errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
}
r.InternalMemUsage += selectionInternalMem
if len(filterColumnTypes) > len(r.ColumnTypes) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/execinfrapb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ type Expression struct {
// (@1, @2, @3 ..) used for "input" variables.
Expr string

// LocalExpr is an unserialized field that's used to pass expressions to local
// flows without serializing/deserializing them.
// LocalExpr is an unserialized field that's used to pass expressions to
// the gateway node without serializing/deserializing them. It is always
// set in non-test setup.
LocalExpr tree.TypedExpr
}

Expand All @@ -172,14 +173,14 @@ func (e *Expression) Empty() bool {

// String implements the Stringer interface.
func (e Expression) String() string {
if e.Expr != "" {
return e.Expr
}
if e.LocalExpr != nil {
ctx := tree.NewFmtCtx(tree.FmtCheckEquivalence)
ctx.FormatNode(e.LocalExpr)
return ctx.CloseAndGetString()
}
if e.Expr != "" {
return e.Expr
}
return "none"
}

Expand Down
34 changes: 12 additions & 22 deletions pkg/sql/physicalplan/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package physicalplan

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -80,39 +78,31 @@ func MakeExpression(
evalCtx: evalCtx,
}

outExpr := expr.(tree.Expr)
if ctx.EvaluateSubqueries() {
outExpr, _ = tree.WalkExpr(subqueryVisitor, expr)
outExpr, _ := tree.WalkExpr(subqueryVisitor, expr)
if subqueryVisitor.err != nil {
return execinfrapb.Expression{}, subqueryVisitor.err
}
expr = outExpr.(tree.TypedExpr)
}

if indexVarMap != nil {
// Remap our indexed vars.
expr = sqlbase.RemapIVarsInTypedExpr(expr, indexVarMap)
}
expression := execinfrapb.Expression{LocalExpr: expr}
if ctx.IsLocal() {
if indexVarMap != nil {
// Remap our indexed vars.
expr = sqlbase.RemapIVarsInTypedExpr(expr, indexVarMap)
}
return execinfrapb.Expression{LocalExpr: expr}, nil
return expression, nil
}

// We format the expression using the IndexedVar and Placeholder formatting interceptors.
// Since the plan is not fully local, serialize the expression.
fmtCtx := execinfrapb.ExprFmtCtxBase(evalCtx)
if indexVarMap != nil {
fmtCtx.SetIndexedVarFormat(func(ctx *tree.FmtCtx, idx int) {
remappedIdx := indexVarMap[idx]
if remappedIdx < 0 {
panic(fmt.Sprintf("unmapped index %d", idx))
}
ctx.Printf("@%d", remappedIdx+1)
})
}
fmtCtx.FormatNode(outExpr)
fmtCtx.FormatNode(expr)
if log.V(1) {
log.Infof(evalCtx.Ctx(), "Expr %s:\n%s", fmtCtx.String(), tree.ExprDebugString(outExpr))
log.Infof(evalCtx.Ctx(), "Expr %s:\n%s", fmtCtx.String(), tree.ExprDebugString(expr))
}
return execinfrapb.Expression{Expr: fmtCtx.CloseAndGetString()}, nil
expression.Expr = fmtCtx.CloseAndGetString()
return expression, nil
}

type evalAndReplaceSubqueryVisitor struct {
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/physicalplan/physical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,18 @@ func (p *PhysicalPlan) AddFilter(
return err
}
if !post.Filter.Empty() {
// Either Expr or LocalExpr will be set (not both).
if filter.Expr != "" {
filter.Expr = fmt.Sprintf("(%s) AND (%s)", post.Filter.Expr, filter.Expr)
} else if filter.LocalExpr != nil {
// LocalExpr is usually set, but it can be left nil in tests, so we
// need to perform the nil check.
if post.Filter.LocalExpr != nil && filter.LocalExpr != nil {
filter.LocalExpr = tree.NewTypedAndExpr(
post.Filter.LocalExpr,
filter.LocalExpr,
)
}
// Expr is set for all distributed plans (as well as in some tests).
if post.Filter.Expr != "" && filter.Expr != "" {
filter.Expr = fmt.Sprintf("(%s) AND (%s)", post.Filter.Expr, filter.Expr)
}
}
for _, pIdx := range p.ResultRouters {
p.Processors[pIdx].Spec.Post.Filter = filter
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/physicalplan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,16 @@ func TestProjectionAndRendering(t *testing.T) {

tc.action(&p)

if post := p.GetLastStagePost(); !reflect.DeepEqual(post, tc.expPost) {
post := p.GetLastStagePost()
// The actual planning always sets unserialized LocalExpr field on the
// expressions, however, we don't do that for the expected results. In
// order to be able to use the deep comparison below we manually unset
// that unserialized field.
post.Filter.LocalExpr = nil
for i := range post.RenderExprs {
post.RenderExprs[i].LocalExpr = nil
}
if !reflect.DeepEqual(post, tc.expPost) {
t.Errorf("%d: incorrect post:\n%s\nexpected:\n%s", testIdx, &post, &tc.expPost)
}
var resTypes []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (jb *joinerBase) init(

if jb.joinType.IsSetOpJoin() {
if !onExpr.Empty() {
return errors.Errorf("expected empty onExpr, got %v", onExpr.Expr)
return errors.Errorf("expected empty onExpr, got %v", onExpr)
}
}

Expand Down

0 comments on commit fc09ef9

Please sign in to comment.