From 5fab1ad8c72b24bbc75177625f623a5da47a057a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Jun 2020 16:51:20 -0700 Subject: [PATCH] colexec: improve expression parsing This commit introduces `colexec.ExprHelper` that helps with expression processing. Previously, we were allocating a new `execinfra.ExprHelper` and were calling `Init` on it in order to get the typed expression from possibly serialized representation of each expression. Now, this new expression helper is reused between all expressions in the flow on a single node. There is one caveat, however: we need to make sure that we force deserialization of the expressions during `SupportsVectorized` check if the flow is scheduled to be run on a remote node (different from the one that is performing the check). This is necessary to make sure that the remote nodes will be able to deserialize the expressions without encountering errors (if we don't force the serialization during the check, we will use `LocalExpr` - if available - and might not catch things that we don't support). Release note: None --- pkg/sql/colexec/execplan.go | 45 ++++++++++---------- pkg/sql/colexec/expr.go | 55 +++++++++++++++++++++++++ pkg/sql/colflow/vectorized_flow.go | 26 +++++++++++- pkg/sql/colflow/vectorized_flow_test.go | 2 +- pkg/sql/distsql_running.go | 9 ++-- pkg/sql/execinfra/expr.go | 35 +++++++++++----- pkg/sql/explain_plan.go | 17 +++----- pkg/sql/explain_vec.go | 10 ++--- 8 files changed, 140 insertions(+), 59 deletions(-) diff --git a/pkg/sql/colexec/execplan.go b/pkg/sql/colexec/execplan.go index 7eb704b7a657..9f9633416fae 100644 --- a/pkg/sql/colexec/execplan.go +++ b/pkg/sql/colexec/execplan.go @@ -99,6 +99,7 @@ type NewColOperatorArgs struct { ProcessorConstructor execinfra.ProcessorConstructor DiskQueueCfg colcontainer.DiskQueueCfg FDSemaphore semaphore.Semaphore + ExprHelper ExprHelper TestingKnobs struct { // UseStreamingMemAccountForBuffering specifies whether to use // StreamingMemAccount when creating buffering operators and should only be @@ -568,6 +569,9 @@ func NewColOperator( streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory) useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering processorConstructor := args.ProcessorConstructor + if args.ExprHelper == nil { + args.ExprHelper = &defaultExprHelper{} + } log.VEventf(ctx, 2, "planning col operator for spec %q", spec) @@ -876,8 +880,9 @@ func NewColOperator( if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == sqlbase.InnerJoin { if err = - result.planAndMaybeWrapOnExprAsFilter(ctx, flowCtx, core.HashJoiner.OnExpr, - streamingMemAccount, processorConstructor, factory); err != nil { + result.planAndMaybeWrapOnExprAsFilter( + ctx, flowCtx, core.HashJoiner.OnExpr, streamingMemAccount, processorConstructor, factory, args.ExprHelper, + ); err != nil { return result, err } } @@ -938,8 +943,9 @@ func NewColOperator( } if onExpr != nil { - if err = result.planAndMaybeWrapOnExprAsFilter(ctx, flowCtx, *onExpr, - streamingMemAccount, processorConstructor, factory); err != nil { + if err = result.planAndMaybeWrapOnExprAsFilter( + ctx, flowCtx, *onExpr, streamingMemAccount, processorConstructor, factory, args.ExprHelper, + ); err != nil { return result, err } } @@ -1093,7 +1099,7 @@ func NewColOperator( Op: result.Op, ColumnTypes: result.ColumnTypes, } - err = ppr.planPostProcessSpec(ctx, flowCtx, post, streamingMemAccount, factory) + err = ppr.planPostProcessSpec(ctx, flowCtx, post, streamingMemAccount, factory, args.ExprHelper) // TODO(yuzefovich): update unit tests to remove panic-catcher when fallback // to rowexec is not allowed. if err != nil && processorConstructor == nil { @@ -1149,6 +1155,7 @@ func (r *NewColOperatorResult) planAndMaybeWrapOnExprAsFilter( streamingMemAccount *mon.BoundAccount, processorConstructor execinfra.ProcessorConstructor, factory coldata.ColumnFactory, + helper ExprHelper, ) error { // We will plan other Operators on top of r.Op, so we need to account for the // internal memory explicitly. @@ -1160,7 +1167,7 @@ func (r *NewColOperatorResult) planAndMaybeWrapOnExprAsFilter( ColumnTypes: r.ColumnTypes, } if err := ppr.planFilterExpr( - ctx, flowCtx.NewEvalCtx(), onExpr, streamingMemAccount, factory, + ctx, flowCtx.NewEvalCtx(), onExpr, streamingMemAccount, factory, helper, ); err != nil { // ON expression planning failed. Fall back to planning the filter // using row execution. @@ -1210,10 +1217,11 @@ func (r *postProcessResult) planPostProcessSpec( post *execinfrapb.PostProcessSpec, streamingMemAccount *mon.BoundAccount, factory coldata.ColumnFactory, + helper ExprHelper, ) error { if !post.Filter.Empty() { if err := r.planFilterExpr( - ctx, flowCtx.NewEvalCtx(), post.Filter, streamingMemAccount, factory, + ctx, flowCtx.NewEvalCtx(), post.Filter, streamingMemAccount, factory, helper, ); err != nil { return err } @@ -1224,18 +1232,15 @@ func (r *postProcessResult) planPostProcessSpec( } else if post.RenderExprs != nil { log.VEventf(ctx, 2, "planning render expressions %+v", post.RenderExprs) var renderedCols []uint32 - for _, expr := range post.RenderExprs { - var ( - helper execinfra.ExprHelper - renderInternalMem int - ) - err := helper.Init(expr, r.ColumnTypes, flowCtx.EvalCtx) + for _, renderExpr := range post.RenderExprs { + var renderInternalMem int + expr, err := helper.ProcessExpr(renderExpr, flowCtx.EvalCtx, r.ColumnTypes) if err != nil { return err } var outputIdx int r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators( - ctx, flowCtx.NewEvalCtx(), helper.Expr, r.ColumnTypes, r.Op, streamingMemAccount, factory, + ctx, flowCtx.NewEvalCtx(), expr, r.ColumnTypes, r.Op, streamingMemAccount, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize render expression %q", expr) @@ -1364,16 +1369,14 @@ func (r *postProcessResult) planFilterExpr( filter execinfrapb.Expression, acc *mon.BoundAccount, factory coldata.ColumnFactory, + helper ExprHelper, ) error { - var ( - helper execinfra.ExprHelper - selectionInternalMem int - ) - err := helper.Init(filter, r.ColumnTypes, evalCtx) + var selectionInternalMem int + expr, err := helper.ProcessExpr(filter, evalCtx, r.ColumnTypes) if err != nil { return err } - if helper.Expr == tree.DNull { + if expr == tree.DNull { // The filter expression is tree.DNull meaning that it is always false, so // we put a zero operator. r.Op = NewZeroOp(r.Op) @@ -1381,7 +1384,7 @@ func (r *postProcessResult) planFilterExpr( } var filterColumnTypes []*types.T r.Op, _, filterColumnTypes, selectionInternalMem, err = planSelectionOperators( - ctx, evalCtx, helper.Expr, r.ColumnTypes, r.Op, acc, factory, + ctx, evalCtx, expr, r.ColumnTypes, r.Op, acc, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize filter expression %q", filter.Expr) diff --git a/pkg/sql/colexec/expr.go b/pkg/sql/colexec/expr.go index df6fa93496e9..75280a28cad1 100644 --- a/pkg/sql/colexec/expr.go +++ b/pkg/sql/colexec/expr.go @@ -11,11 +11,66 @@ package colexec import ( + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" ) +// ExprHelper is a utility interface that helps with expression handling in +// the vectorized engine. +type ExprHelper interface { + // ProcessExpr processes the given expression and returns a well-typed + // expression. + ProcessExpr(execinfrapb.Expression, *tree.EvalContext, []*types.T) (tree.TypedExpr, error) +} + +// NewExprHelper returns a new ExprHelper. forceExprDeserialization determines +// whether LocalExpr field is ignored by the helper. +func NewExprHelper(forceExprDeserialization bool) ExprHelper { + if forceExprDeserialization { + return &forcedDeserializationExprHelper{} + } + return &defaultExprHelper{} +} + +// defaultExprHelper is an ExprHelper that takes advantage of already present +// well-typed expression in LocalExpr when set. +type defaultExprHelper struct { + helper execinfra.ExprHelper +} + +var _ ExprHelper = &defaultExprHelper{} + +func (h *defaultExprHelper) ProcessExpr( + expr execinfrapb.Expression, evalCtx *tree.EvalContext, typs []*types.T, +) (tree.TypedExpr, error) { + if expr.LocalExpr != nil { + return expr.LocalExpr, nil + } + h.helper.Types = typs + tempVars := tree.MakeIndexedVarHelper(&h.helper, len(typs)) + return execinfra.DeserializeExpr(expr.Expr, evalCtx, &tempVars) +} + +// forcedDeserializationExprHelper is an ExprHelper that always deserializes +// (namely, parses, type-checks, and evaluates the constants) the provided +// expression, completely ignoring LocalExpr field if set. +type forcedDeserializationExprHelper struct { + helper execinfra.ExprHelper +} + +var _ ExprHelper = &forcedDeserializationExprHelper{} + +func (h *forcedDeserializationExprHelper) ProcessExpr( + expr execinfrapb.Expression, evalCtx *tree.EvalContext, typs []*types.T, +) (tree.TypedExpr, error) { + h.helper.Types = typs + tempVars := tree.MakeIndexedVarHelper(&h.helper, len(typs)) + return execinfra.DeserializeExpr(expr.Expr, evalCtx, &tempVars) +} + // Remove unused warning. var _ = findIVarsInRange diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 17a1594a0d07..4bc70e3b34da 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -226,6 +226,7 @@ func (f *vectorizedFlow) Setup( f.GetID(), diskQueueCfg, f.countingSemaphore, + false, /* forceExprDeserialization */ ) if f.testingKnobs.onSetupFlow != nil { f.testingKnobs.onSetupFlow(creator) @@ -431,6 +432,7 @@ type vectorizedFlowCreator struct { syncFlowConsumer execinfra.RowReceiver nodeDialer *nodedialer.Dialer flowID execinfrapb.FlowID + exprHelper colexec.ExprHelper // numOutboxes counts how many exec.Outboxes have been set up on this node. // It must be accessed atomically. @@ -466,6 +468,7 @@ func newVectorizedFlowCreator( flowID execinfrapb.FlowID, diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, + forceExprDeserialization bool, ) *vectorizedFlowCreator { return &vectorizedFlowCreator{ flowCreatorHelper: helper, @@ -479,6 +482,7 @@ func newVectorizedFlowCreator( flowID: flowID, diskQueueCfg: diskQueueCfg, fdSemaphore: fdSemaphore, + exprHelper: colexec.NewExprHelper(forceExprDeserialization), } } @@ -952,6 +956,7 @@ func (s *vectorizedFlowCreator) setupFlow( ProcessorConstructor: rowexec.NewProcessor, DiskQueueCfg: s.diskQueueCfg, FDSemaphore: s.fdSemaphore, + ExprHelper: s.exprHelper, } result, err := colexec.NewColOperator(ctx, flowCtx, args) // Even when err is non-nil, it is possible that the buffering memory @@ -1170,17 +1175,34 @@ func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { // EXPLAIN output. // Note that passed-in output can be nil, but if it is non-nil, only Types() // method on it might be called (nothing will actually get Push()'ed into it). +// - scheduledOnRemoteNode indicates whether the flow that processorSpecs +// represent is scheduled to be run on a remote node (different from the one +// performing this check). func SupportsVectorized( ctx context.Context, flowCtx *execinfra.FlowCtx, processorSpecs []execinfrapb.ProcessorSpec, - fuseOpt flowinfra.FuseOpt, + isPlanLocal bool, output execinfra.RowReceiver, + scheduledOnRemoteNode bool, ) (leaves []execinfra.OpNode, err error) { if output == nil { output = &execinfra.RowChannel{} } - creator := newVectorizedFlowCreator(newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, nil, output, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, flowCtx.Cfg.VecFDSemaphore) + fuseOpt := flowinfra.FuseNormally + if isPlanLocal { + fuseOpt = flowinfra.FuseAggressively + } + // We want to force the expression deserialization if this flow is actually + // scheduled to be on the remote node in order to make sure that during + // actual execution the remote node will be able to deserialize the + // expressions without an error. + forceExprDeserialization := scheduledOnRemoteNode + creator := newVectorizedFlowCreator( + newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, + nil, output, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, + flowCtx.Cfg.VecFDSemaphore, forceExprDeserialization, + ) // We create an unlimited memory account because we're interested whether the // flow is supported via the vectorized engine in general (without paying // attention to the memory since it is node-dependent in the distributed diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index cad20267b3e5..a0c7cecb513e 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -214,7 +214,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { defer evalCtx.Stop(ctx) f := &flowinfra.FlowBase{FlowCtx: execinfra.FlowCtx{EvalCtx: &evalCtx, NodeID: base.TestingIDContainer}} var wg sync.WaitGroup - vfc := newVectorizedFlowCreator(&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil) + vfc := newVectorizedFlowCreator(&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil, false /* forceExprDeserialization */) _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally) defer func() { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 4f279a35577d..c09df9839383 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -158,16 +158,13 @@ func (dsp *DistSQLPlanner) setupFlows( // the execution time. setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff) } else { - fuseOpt := flowinfra.FuseNormally - if localState.IsLocal { - fuseOpt = flowinfra.FuseAggressively - } // Now we check to see whether or not to even try vectorizing the flow. // The goal here is to determine up front whether all of the flows can be // vectorized. If any of them can't, turn off the setting. // TODO(yuzefovich): this is a safe but quite inefficient way of setting // up vectorized flows since the flows will effectively be planned twice. - for _, spec := range flows { + for scheduledOnNodeID, spec := range flows { + scheduledOnRemoteNode := scheduledOnNodeID != thisNodeID if _, err := colflow.SupportsVectorized( ctx, &execinfra.FlowCtx{ EvalCtx: &evalCtx.EvalContext, @@ -178,7 +175,7 @@ func (dsp *DistSQLPlanner) setupFlows( VecFDSemaphore: dsp.distSQLSrv.VecFDSemaphore, }, NodeID: evalCtx.NodeID, - }, spec.Processors, fuseOpt, recv, + }, spec.Processors, localState.IsLocal, recv, scheduledOnRemoteNode, ); err != nil { // Vectorization attempt failed with an error. returnVectorizationSetupError := false diff --git a/pkg/sql/execinfra/expr.go b/pkg/sql/execinfra/expr.go index 778220bd734d..ce5c73df8e72 100644 --- a/pkg/sql/execinfra/expr.go +++ b/pkg/sql/execinfra/expr.go @@ -139,6 +139,28 @@ func (eh *ExprHelper) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { return &n } +// DeserializeExpr deserializes expr, binds the indexed variables to the +// provided IndexedVarHelper, and evaluates any constants in the expression. +func DeserializeExpr( + expr string, evalCtx *tree.EvalContext, vars *tree.IndexedVarHelper, +) (tree.TypedExpr, error) { + if expr == "" { + return nil, nil + } + + semaContext := tree.MakeSemaContext() + semaContext.TypeResolver = evalCtx.TypeResolver + deserializedExpr, err := processExpression(execinfrapb.Expression{Expr: expr}, evalCtx, &semaContext, vars) + if err != nil { + return deserializedExpr, err + } + var t transform.ExprTransformContext + if t.AggregateInExpr(deserializedExpr, evalCtx.SessionData.SearchPath) { + return nil, errors.Errorf("expression '%s' has aggregate", deserializedExpr) + } + return deserializedExpr, nil +} + // Init initializes the ExprHelper. func (eh *ExprHelper) Init( expr execinfrapb.Expression, types []*types.T, evalCtx *tree.EvalContext, @@ -157,17 +179,8 @@ func (eh *ExprHelper) Init( return nil } var err error - semaContext := tree.MakeSemaContext() - semaContext.TypeResolver = evalCtx.TypeResolver - eh.Expr, err = processExpression(expr, evalCtx, &semaContext, &eh.Vars) - if err != nil { - return err - } - var t transform.ExprTransformContext - if t.AggregateInExpr(eh.Expr, evalCtx.SessionData.SearchPath) { - return errors.Errorf("expression '%s' has aggregate", eh.Expr) - } - return nil + eh.Expr, err = DeserializeExpr(expr.Expr, evalCtx, &eh.Vars) + return err } // EvalFilter is used for filter expressions; it evaluates the expression and diff --git a/pkg/sql/explain_plan.go b/pkg/sql/explain_plan.go index 9d8825b32525..c312bc55e3c1 100644 --- a/pkg/sql/explain_plan.go +++ b/pkg/sql/explain_plan.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colflow" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -212,21 +211,17 @@ func populateExplain( ctxSessionData := flowCtx.EvalCtx.SessionData vectorizedThresholdMet := physicalPlan.MaxEstimatedRowCount >= ctxSessionData.VectorizeRowCountThreshold - willVectorize = true if ctxSessionData.VectorizeMode == sessiondata.VectorizeOff { willVectorize = false } else if !vectorizedThresholdMet && (ctxSessionData.VectorizeMode == sessiondata.Vectorize201Auto || ctxSessionData.VectorizeMode == sessiondata.VectorizeOn) { willVectorize = false } else { - thisNodeID := distSQLPlanner.nodeDesc.NodeID - for nodeID, flow := range flows { - fuseOpt := flowinfra.FuseNormally - if nodeID == thisNodeID && !willDistribute { - fuseOpt = flowinfra.FuseAggressively - } - _, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.Processors, fuseOpt, nil /* output */) - willVectorize = willVectorize && (err == nil) - if !willVectorize { + willVectorize = true + thisNodeID, _ := params.extendedEvalCtx.NodeID.OptionalNodeID() + for scheduledOnNodeID, flow := range flows { + scheduledOnRemoteNode := scheduledOnNodeID != thisNodeID + if _, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.Processors, !willDistribute, nil /* output */, scheduledOnRemoteNode); err != nil { + willVectorize = false break } } diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index b08cd8372d8c..02c6c2610b99 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -102,14 +101,11 @@ func (n *explainVecNode) startExec(params runParams) error { tp := treeprinter.NewWithIndent(false /* leftPad */, true /* rightPad */, 0 /* edgeLength */) root := tp.Child("") verbose := n.options.Flags[tree.ExplainFlagVerbose] - thisNodeID := distSQLPlanner.nodeDesc.NodeID + thisNodeID, _ := params.extendedEvalCtx.NodeID.OptionalNodeID() for _, flow := range sortedFlows { node := root.Childf("Node %d", flow.nodeID) - fuseOpt := flowinfra.FuseNormally - if flow.nodeID == thisNodeID && !willDistribute { - fuseOpt = flowinfra.FuseAggressively - } - opChains, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.flow.Processors, fuseOpt, nil /* output */) + scheduledOnRemoteNode := flow.nodeID != thisNodeID + opChains, err := colflow.SupportsVectorized(params.ctx, flowCtx, flow.flow.Processors, !willDistribute, nil /* output */, scheduledOnRemoteNode) if err != nil { return err }