Skip to content

Commit

Permalink
colexec: improve expression parsing
Browse files Browse the repository at this point in the history
This commit introduces `colexec.ExprHelper` that helps with expression
processing. Previously, we were allocating a new `execinfra.ExprHelper`
and were calling `Init` on it in order to get the typed expression from
possibly serialized representation of each expression. Now, this new
expression helper is reused between all expressions in the flow on
a single node.

There is one caveat, however: we need to make sure that we force
deserialization of the expressions during `SupportsVectorized` check if
the flow is scheduled to be run on a remote node (different from the one
that is performing the check). This is necessary to make sure that the
remote nodes will be able to deserialize the expressions without
encountering errors (if we don't force the serialization during the
check, we will use `LocalExpr` - if available - and might not catch
things that we don't support).

Release note: None
  • Loading branch information
yuzefovich committed Jun 10, 2020
1 parent 096265d commit 5fab1ad
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 59 deletions.
45 changes: 24 additions & 21 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type NewColOperatorArgs struct {
ProcessorConstructor execinfra.ProcessorConstructor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper ExprHelper
TestingKnobs struct {
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
Expand Down Expand Up @@ -568,6 +569,9 @@ func NewColOperator(
streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory)
useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering
processorConstructor := args.ProcessorConstructor
if args.ExprHelper == nil {
args.ExprHelper = &defaultExprHelper{}
}

log.VEventf(ctx, 2, "planning col operator for spec %q", spec)

Expand Down Expand Up @@ -876,8 +880,9 @@ func NewColOperator(

if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == sqlbase.InnerJoin {
if err =
result.planAndMaybeWrapOnExprAsFilter(ctx, flowCtx, core.HashJoiner.OnExpr,
streamingMemAccount, processorConstructor, factory); err != nil {
result.planAndMaybeWrapOnExprAsFilter(
ctx, flowCtx, core.HashJoiner.OnExpr, streamingMemAccount, processorConstructor, factory, args.ExprHelper,
); err != nil {
return result, err
}
}
Expand Down Expand Up @@ -938,8 +943,9 @@ func NewColOperator(
}

if onExpr != nil {
if err = result.planAndMaybeWrapOnExprAsFilter(ctx, flowCtx, *onExpr,
streamingMemAccount, processorConstructor, factory); err != nil {
if err = result.planAndMaybeWrapOnExprAsFilter(
ctx, flowCtx, *onExpr, streamingMemAccount, processorConstructor, factory, args.ExprHelper,
); err != nil {
return result, err
}
}
Expand Down Expand Up @@ -1093,7 +1099,7 @@ func NewColOperator(
Op: result.Op,
ColumnTypes: result.ColumnTypes,
}
err = ppr.planPostProcessSpec(ctx, flowCtx, post, streamingMemAccount, factory)
err = ppr.planPostProcessSpec(ctx, flowCtx, post, streamingMemAccount, factory, args.ExprHelper)
// TODO(yuzefovich): update unit tests to remove panic-catcher when fallback
// to rowexec is not allowed.
if err != nil && processorConstructor == nil {
Expand Down Expand Up @@ -1149,6 +1155,7 @@ func (r *NewColOperatorResult) planAndMaybeWrapOnExprAsFilter(
streamingMemAccount *mon.BoundAccount,
processorConstructor execinfra.ProcessorConstructor,
factory coldata.ColumnFactory,
helper ExprHelper,
) error {
// We will plan other Operators on top of r.Op, so we need to account for the
// internal memory explicitly.
Expand All @@ -1160,7 +1167,7 @@ func (r *NewColOperatorResult) planAndMaybeWrapOnExprAsFilter(
ColumnTypes: r.ColumnTypes,
}
if err := ppr.planFilterExpr(
ctx, flowCtx.NewEvalCtx(), onExpr, streamingMemAccount, factory,
ctx, flowCtx.NewEvalCtx(), onExpr, streamingMemAccount, factory, helper,
); err != nil {
// ON expression planning failed. Fall back to planning the filter
// using row execution.
Expand Down Expand Up @@ -1210,10 +1217,11 @@ func (r *postProcessResult) planPostProcessSpec(
post *execinfrapb.PostProcessSpec,
streamingMemAccount *mon.BoundAccount,
factory coldata.ColumnFactory,
helper ExprHelper,
) error {
if !post.Filter.Empty() {
if err := r.planFilterExpr(
ctx, flowCtx.NewEvalCtx(), post.Filter, streamingMemAccount, factory,
ctx, flowCtx.NewEvalCtx(), post.Filter, streamingMemAccount, factory, helper,
); err != nil {
return err
}
Expand All @@ -1224,18 +1232,15 @@ func (r *postProcessResult) planPostProcessSpec(
} else if post.RenderExprs != nil {
log.VEventf(ctx, 2, "planning render expressions %+v", post.RenderExprs)
var renderedCols []uint32
for _, expr := range post.RenderExprs {
var (
helper execinfra.ExprHelper
renderInternalMem int
)
err := helper.Init(expr, r.ColumnTypes, flowCtx.EvalCtx)
for _, renderExpr := range post.RenderExprs {
var renderInternalMem int
expr, err := helper.ProcessExpr(renderExpr, flowCtx.EvalCtx, r.ColumnTypes)
if err != nil {
return err
}
var outputIdx int
r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators(
ctx, flowCtx.NewEvalCtx(), helper.Expr, r.ColumnTypes, r.Op, streamingMemAccount, factory,
ctx, flowCtx.NewEvalCtx(), expr, r.ColumnTypes, r.Op, streamingMemAccount, factory,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize render expression %q", expr)
Expand Down Expand Up @@ -1364,24 +1369,22 @@ func (r *postProcessResult) planFilterExpr(
filter execinfrapb.Expression,
acc *mon.BoundAccount,
factory coldata.ColumnFactory,
helper ExprHelper,
) error {
var (
helper execinfra.ExprHelper
selectionInternalMem int
)
err := helper.Init(filter, r.ColumnTypes, evalCtx)
var selectionInternalMem int
expr, err := helper.ProcessExpr(filter, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
if helper.Expr == tree.DNull {
if expr == tree.DNull {
// The filter expression is tree.DNull meaning that it is always false, so
// we put a zero operator.
r.Op = NewZeroOp(r.Op)
return nil
}
var filterColumnTypes []*types.T
r.Op, _, filterColumnTypes, selectionInternalMem, err = planSelectionOperators(
ctx, evalCtx, helper.Expr, r.ColumnTypes, r.Op, acc, factory,
ctx, evalCtx, expr, r.ColumnTypes, r.Op, acc, factory,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize filter expression %q", filter.Expr)
Expand Down
55 changes: 55 additions & 0 deletions pkg/sql/colexec/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,66 @@
package colexec

import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// ExprHelper is a utility interface that helps with expression handling in
// the vectorized engine.
type ExprHelper interface {
// ProcessExpr processes the given expression and returns a well-typed
// expression.
ProcessExpr(execinfrapb.Expression, *tree.EvalContext, []*types.T) (tree.TypedExpr, error)
}

// NewExprHelper returns a new ExprHelper. forceExprDeserialization determines
// whether LocalExpr field is ignored by the helper.
func NewExprHelper(forceExprDeserialization bool) ExprHelper {
if forceExprDeserialization {
return &forcedDeserializationExprHelper{}
}
return &defaultExprHelper{}
}

// defaultExprHelper is an ExprHelper that takes advantage of already present
// well-typed expression in LocalExpr when set.
type defaultExprHelper struct {
helper execinfra.ExprHelper
}

var _ ExprHelper = &defaultExprHelper{}

func (h *defaultExprHelper) ProcessExpr(
expr execinfrapb.Expression, evalCtx *tree.EvalContext, typs []*types.T,
) (tree.TypedExpr, error) {
if expr.LocalExpr != nil {
return expr.LocalExpr, nil
}
h.helper.Types = typs
tempVars := tree.MakeIndexedVarHelper(&h.helper, len(typs))
return execinfra.DeserializeExpr(expr.Expr, evalCtx, &tempVars)
}

// forcedDeserializationExprHelper is an ExprHelper that always deserializes
// (namely, parses, type-checks, and evaluates the constants) the provided
// expression, completely ignoring LocalExpr field if set.
type forcedDeserializationExprHelper struct {
helper execinfra.ExprHelper
}

var _ ExprHelper = &forcedDeserializationExprHelper{}

func (h *forcedDeserializationExprHelper) ProcessExpr(
expr execinfrapb.Expression, evalCtx *tree.EvalContext, typs []*types.T,
) (tree.TypedExpr, error) {
h.helper.Types = typs
tempVars := tree.MakeIndexedVarHelper(&h.helper, len(typs))
return execinfra.DeserializeExpr(expr.Expr, evalCtx, &tempVars)
}

// Remove unused warning.
var _ = findIVarsInRange

Expand Down
26 changes: 24 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (f *vectorizedFlow) Setup(
f.GetID(),
diskQueueCfg,
f.countingSemaphore,
false, /* forceExprDeserialization */
)
if f.testingKnobs.onSetupFlow != nil {
f.testingKnobs.onSetupFlow(creator)
Expand Down Expand Up @@ -431,6 +432,7 @@ type vectorizedFlowCreator struct {
syncFlowConsumer execinfra.RowReceiver
nodeDialer *nodedialer.Dialer
flowID execinfrapb.FlowID
exprHelper colexec.ExprHelper

// numOutboxes counts how many exec.Outboxes have been set up on this node.
// It must be accessed atomically.
Expand Down Expand Up @@ -466,6 +468,7 @@ func newVectorizedFlowCreator(
flowID execinfrapb.FlowID,
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
forceExprDeserialization bool,
) *vectorizedFlowCreator {
return &vectorizedFlowCreator{
flowCreatorHelper: helper,
Expand All @@ -479,6 +482,7 @@ func newVectorizedFlowCreator(
flowID: flowID,
diskQueueCfg: diskQueueCfg,
fdSemaphore: fdSemaphore,
exprHelper: colexec.NewExprHelper(forceExprDeserialization),
}
}

Expand Down Expand Up @@ -952,6 +956,7 @@ func (s *vectorizedFlowCreator) setupFlow(
ProcessorConstructor: rowexec.NewProcessor,
DiskQueueCfg: s.diskQueueCfg,
FDSemaphore: s.fdSemaphore,
ExprHelper: s.exprHelper,
}
result, err := colexec.NewColOperator(ctx, flowCtx, args)
// Even when err is non-nil, it is possible that the buffering memory
Expand Down Expand Up @@ -1170,17 +1175,34 @@ func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
// EXPLAIN output.
// Note that passed-in output can be nil, but if it is non-nil, only Types()
// method on it might be called (nothing will actually get Push()'ed into it).
// - scheduledOnRemoteNode indicates whether the flow that processorSpecs
// represent is scheduled to be run on a remote node (different from the one
// performing this check).
func SupportsVectorized(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorSpecs []execinfrapb.ProcessorSpec,
fuseOpt flowinfra.FuseOpt,
isPlanLocal bool,
output execinfra.RowReceiver,
scheduledOnRemoteNode bool,
) (leaves []execinfra.OpNode, err error) {
if output == nil {
output = &execinfra.RowChannel{}
}
creator := newVectorizedFlowCreator(newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, nil, output, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, flowCtx.Cfg.VecFDSemaphore)
fuseOpt := flowinfra.FuseNormally
if isPlanLocal {
fuseOpt = flowinfra.FuseAggressively
}
// We want to force the expression deserialization if this flow is actually
// scheduled to be on the remote node in order to make sure that during
// actual execution the remote node will be able to deserialize the
// expressions without an error.
forceExprDeserialization := scheduledOnRemoteNode
creator := newVectorizedFlowCreator(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false,
nil, output, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, forceExprDeserialization,
)
// We create an unlimited memory account because we're interested whether the
// flow is supported via the vectorized engine in general (without paying
// attention to the memory since it is node-dependent in the distributed
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
defer evalCtx.Stop(ctx)
f := &flowinfra.FlowBase{FlowCtx: execinfra.FlowCtx{EvalCtx: &evalCtx, NodeID: base.TestingIDContainer}}
var wg sync.WaitGroup
vfc := newVectorizedFlowCreator(&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil)
vfc := newVectorizedFlowCreator(&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil, false /* forceExprDeserialization */)

_, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally)
defer func() {
Expand Down
9 changes: 3 additions & 6 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,13 @@ func (dsp *DistSQLPlanner) setupFlows(
// the execution time.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
} else {
fuseOpt := flowinfra.FuseNormally
if localState.IsLocal {
fuseOpt = flowinfra.FuseAggressively
}
// Now we check to see whether or not to even try vectorizing the flow.
// The goal here is to determine up front whether all of the flows can be
// vectorized. If any of them can't, turn off the setting.
// TODO(yuzefovich): this is a safe but quite inefficient way of setting
// up vectorized flows since the flows will effectively be planned twice.
for _, spec := range flows {
for scheduledOnNodeID, spec := range flows {
scheduledOnRemoteNode := scheduledOnNodeID != thisNodeID
if _, err := colflow.SupportsVectorized(
ctx, &execinfra.FlowCtx{
EvalCtx: &evalCtx.EvalContext,
Expand All @@ -178,7 +175,7 @@ func (dsp *DistSQLPlanner) setupFlows(
VecFDSemaphore: dsp.distSQLSrv.VecFDSemaphore,
},
NodeID: evalCtx.NodeID,
}, spec.Processors, fuseOpt, recv,
}, spec.Processors, localState.IsLocal, recv, scheduledOnRemoteNode,
); err != nil {
// Vectorization attempt failed with an error.
returnVectorizationSetupError := false
Expand Down
35 changes: 24 additions & 11 deletions pkg/sql/execinfra/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,28 @@ func (eh *ExprHelper) IndexedVarNodeFormatter(idx int) tree.NodeFormatter {
return &n
}

// DeserializeExpr deserializes expr, binds the indexed variables to the
// provided IndexedVarHelper, and evaluates any constants in the expression.
func DeserializeExpr(
expr string, evalCtx *tree.EvalContext, vars *tree.IndexedVarHelper,
) (tree.TypedExpr, error) {
if expr == "" {
return nil, nil
}

semaContext := tree.MakeSemaContext()
semaContext.TypeResolver = evalCtx.TypeResolver
deserializedExpr, err := processExpression(execinfrapb.Expression{Expr: expr}, evalCtx, &semaContext, vars)
if err != nil {
return deserializedExpr, err
}
var t transform.ExprTransformContext
if t.AggregateInExpr(deserializedExpr, evalCtx.SessionData.SearchPath) {
return nil, errors.Errorf("expression '%s' has aggregate", deserializedExpr)
}
return deserializedExpr, nil
}

// Init initializes the ExprHelper.
func (eh *ExprHelper) Init(
expr execinfrapb.Expression, types []*types.T, evalCtx *tree.EvalContext,
Expand All @@ -157,17 +179,8 @@ func (eh *ExprHelper) Init(
return nil
}
var err error
semaContext := tree.MakeSemaContext()
semaContext.TypeResolver = evalCtx.TypeResolver
eh.Expr, err = processExpression(expr, evalCtx, &semaContext, &eh.Vars)
if err != nil {
return err
}
var t transform.ExprTransformContext
if t.AggregateInExpr(eh.Expr, evalCtx.SessionData.SearchPath) {
return errors.Errorf("expression '%s' has aggregate", eh.Expr)
}
return nil
eh.Expr, err = DeserializeExpr(expr.Expr, evalCtx, &eh.Vars)
return err
}

// EvalFilter is used for filter expressions; it evaluates the expression and
Expand Down
17 changes: 6 additions & 11 deletions pkg/sql/explain_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -212,21 +211,17 @@ func populateExplain(

ctxSessionData := flowCtx.EvalCtx.SessionData
vectorizedThresholdMet := physicalPlan.MaxEstimatedRowCount >= ctxSessionData.VectorizeRowCountThreshold
willVectorize = true
if ctxSessionData.VectorizeMode == sessiondata.VectorizeOff {
willVectorize = false
} else if !vectorizedThresholdMet && (ctxSessionData.VectorizeMode == sessiondata.Vectorize201Auto || ctxSessionData.VectorizeMode == sessiondata.VectorizeOn) {
willVectorize = false
} else {
thisNodeID := distSQLPlanner.nodeDesc.NodeID
for nodeID, flow := range flows {
fuseOpt := flowinfra.FuseNormally
if nodeID == thisNodeID && !willDistribute {
fuseOpt = flowinfra.FuseAggressively
}
_, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.Processors, fuseOpt, nil /* output */)
willVectorize = willVectorize && (err == nil)
if !willVectorize {
willVectorize = true
thisNodeID, _ := params.extendedEvalCtx.NodeID.OptionalNodeID()
for scheduledOnNodeID, flow := range flows {
scheduledOnRemoteNode := scheduledOnNodeID != thisNodeID
if _, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.Processors, !willDistribute, nil /* output */, scheduledOnRemoteNode); err != nil {
willVectorize = false
break
}
}
Expand Down
Loading

0 comments on commit 5fab1ad

Please sign in to comment.