Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83689: colbuilder: vectorizing rendering on top of wrapped processors r=yuzefovich a=yuzefovich

**sql: remove no longer used fields from PostProcessSpec**

This commit removes no longer used `OriginalOutputColumns` and
`OriginalRenderExprs` fields from the `PostProcessSpec` - they
are no longer needed because we now propagate the set of needed
columns in the `IndexFetchSpec`.

Release note: None

**colbuilder: check native support via processor core rather than the spec**

This commit is just a mechanical change.

Release note: None

**colbuilder: vectorizing rendering on top of wrapped processors**

Previously, whenever we needed to wrap a row-by-row processor into the
vectorized flow, we would pass in the whole `PostProcessSpec` to that
processor. When this was originally implemented several years ago, this
was needed so that processors could determine the set of "needed
columns" (which should be decoded from KV responses). However, recently
we refactored that, and now the set of needed columns is passed via the
`IndexFetchSpec`. This means that we no longer need to pass the whole
`PostProcessSpec` when wrapping, and this commit takes advantage of that
observation in order to vectorize the evaluation of render expressions.
In particular, vectorizing of render expressions allows us to plan more
efficient vectorized builtins. We still pass all other parts of the
`PostProcessSpec` (meaning that the wrapped processor is still
responsible for projections, limits, and offsets) since those operations
will limit the number of datums that need to be converted to the
columnar in-memory format as well as provide limit hints to the wrapped
processors.

Release note: None

84222: logictest: temporarily disable workmem randomization in SQLLite tests r=yuzefovich a=yuzefovich

The failure is not really a concerning one, so it's ok to skip until
we figure it out.

Informs: cockroachdb#84022.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 11, 2022
3 parents 309e100 + f5acd81 + 2bdcab7 commit a7a10ff
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 136 deletions.
175 changes: 90 additions & 85 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
// vectorized engine (neither natively nor by wrapping the corresponding row
// execution processor).
func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
err := supportedNatively(spec)
err := supportedNatively(&spec.Core)
if err != nil {
if wrapErr := canWrap(mode, spec); wrapErr == nil {
if wrapErr := canWrap(mode, &spec.Core); wrapErr == nil {
// We don't support this spec natively, but we can wrap the row
// execution processor.
return nil
Expand All @@ -165,59 +165,59 @@ func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.Process
}

// supportedNatively checks whether we have a columnar operator equivalent to a
// processor described by spec. Note that it doesn't perform any other checks
// processor described by core. Note that it doesn't perform any other checks
// (like validity of the number of inputs).
func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
func supportedNatively(core *execinfrapb.ProcessorCoreUnion) error {
switch {
case spec.Core.Noop != nil:
case core.Noop != nil:
return nil

case spec.Core.Values != nil:
case core.Values != nil:
return nil

case spec.Core.TableReader != nil:
case core.TableReader != nil:
return nil

case spec.Core.JoinReader != nil:
if !spec.Core.JoinReader.IsIndexJoin() {
case core.JoinReader != nil:
if !core.JoinReader.IsIndexJoin() {
return errLookupJoinUnsupported
}
return nil

case spec.Core.Filterer != nil:
case core.Filterer != nil:
return nil

case spec.Core.Aggregator != nil:
for _, agg := range spec.Core.Aggregator.Aggregations {
case core.Aggregator != nil:
for _, agg := range core.Aggregator.Aggregations {
if agg.FilterColIdx != nil {
return errors.Newf("filtering aggregation not supported")
}
}
return nil

case spec.Core.Distinct != nil:
case core.Distinct != nil:
return nil

case spec.Core.Ordinality != nil:
case core.Ordinality != nil:
return nil

case spec.Core.HashJoiner != nil:
if !spec.Core.HashJoiner.OnExpr.Empty() && spec.Core.HashJoiner.Type != descpb.InnerJoin {
case core.HashJoiner != nil:
if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type != descpb.InnerJoin {
return errors.Newf("can't plan vectorized non-inner hash joins with ON expressions")
}
return nil

case spec.Core.MergeJoiner != nil:
if !spec.Core.MergeJoiner.OnExpr.Empty() && spec.Core.MergeJoiner.Type != descpb.InnerJoin {
case core.MergeJoiner != nil:
if !core.MergeJoiner.OnExpr.Empty() && core.MergeJoiner.Type != descpb.InnerJoin {
return errors.Errorf("can't plan non-inner merge join with ON expressions")
}
return nil

case spec.Core.Sorter != nil:
case core.Sorter != nil:
return nil

case spec.Core.Windower != nil:
for _, wf := range spec.Core.Windower.WindowFns {
case core.Windower != nil:
for _, wf := range core.Windower.WindowFns {
if wf.FilterColIdx != tree.NoColumnIdx {
return errors.Newf("window functions with FILTER clause are not supported")
}
Expand All @@ -229,7 +229,7 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
}
return nil

case spec.Core.LocalPlanNode != nil:
case core.LocalPlanNode != nil:
// LocalPlanNode core is special (we don't have any plans on vectorizing
// it at the moment), so we want to return a custom error for it to
// distinguish from other unsupported cores.
Expand All @@ -256,55 +256,55 @@ var (
errLookupJoinUnsupported = errors.New("lookup join reader is unsupported in vectorized")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
if mode == sessiondatapb.VectorizeExperimentalAlways && spec.Core.JoinReader == nil && spec.Core.LocalPlanNode == nil {
func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error {
if mode == sessiondatapb.VectorizeExperimentalAlways && core.JoinReader == nil && core.LocalPlanNode == nil {
return errExperimentalWrappingProhibited
}
switch {
case spec.Core.Noop != nil:
case spec.Core.TableReader != nil:
case spec.Core.JoinReader != nil:
case spec.Core.Sorter != nil:
case spec.Core.Aggregator != nil:
case spec.Core.Distinct != nil:
case spec.Core.MergeJoiner != nil:
case spec.Core.HashJoiner != nil:
case spec.Core.Values != nil:
case spec.Core.Backfiller != nil:
case core.Noop != nil:
case core.TableReader != nil:
case core.JoinReader != nil:
case core.Sorter != nil:
case core.Aggregator != nil:
case core.Distinct != nil:
case core.MergeJoiner != nil:
case core.HashJoiner != nil:
case core.Values != nil:
case core.Backfiller != nil:
return errBackfillerWrap
case spec.Core.ReadImport != nil:
case core.ReadImport != nil:
return errReadImportWrap
case spec.Core.Exporter != nil:
case core.Exporter != nil:
return errExporterWrap
case spec.Core.Sampler != nil:
case core.Sampler != nil:
return errSamplerWrap
case spec.Core.SampleAggregator != nil:
case core.SampleAggregator != nil:
return errSampleAggregatorWrap
case spec.Core.ZigzagJoiner != nil:
case spec.Core.ProjectSet != nil:
case spec.Core.Windower != nil:
case spec.Core.LocalPlanNode != nil:
case spec.Core.ChangeAggregator != nil:
case core.ZigzagJoiner != nil:
case core.ProjectSet != nil:
case core.Windower != nil:
case core.LocalPlanNode != nil:
case core.ChangeAggregator != nil:
// Currently, there is an issue with cleaning up the changefeed flows
// (#55408), so we fallback to the row-by-row engine.
return errChangeAggregatorWrap
case spec.Core.ChangeFrontier != nil:
case core.ChangeFrontier != nil:
// Currently, there is an issue with cleaning up the changefeed flows
// (#55408), so we fallback to the row-by-row engine.
return errChangeFrontierWrap
case spec.Core.Ordinality != nil:
case spec.Core.BulkRowWriter != nil:
case spec.Core.InvertedFilterer != nil:
case spec.Core.InvertedJoiner != nil:
case spec.Core.BackupData != nil:
case core.Ordinality != nil:
case core.BulkRowWriter != nil:
case core.InvertedFilterer != nil:
case core.InvertedJoiner != nil:
case core.BackupData != nil:
return errBackupDataWrap
case spec.Core.SplitAndScatter != nil:
case spec.Core.RestoreData != nil:
case spec.Core.Filterer != nil:
case spec.Core.StreamIngestionData != nil:
case spec.Core.StreamIngestionFrontier != nil:
case core.SplitAndScatter != nil:
case core.RestoreData != nil:
case core.Filterer != nil:
case core.StreamIngestionData != nil:
case core.StreamIngestionFrontier != nil:
default:
return errors.AssertionFailedf("unexpected processor core %q", spec.Core)
return errors.AssertionFailedf("unexpected processor core %q", core)
}
return nil
}
Expand Down Expand Up @@ -508,15 +508,17 @@ func (r opResult) createAndWrapRowSource(
args *colexecargs.NewColOperatorArgs,
inputs []colexecargs.OpWithMetaInfo,
inputTypes [][]*types.T,
spec *execinfrapb.ProcessorSpec,
core *execinfrapb.ProcessorCoreUnion,
post *execinfrapb.PostProcessSpec,
processorID int32,
factory coldata.ColumnFactory,
causeToWrap error,
) error {
if args.ProcessorConstructor == nil {
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, spec); err != nil {
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, core); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
Expand All @@ -540,14 +542,14 @@ func (r opResult) createAndWrapRowSource(
inputs,
inputTypes,
args.StreamingMemAccount,
spec.ProcessorID,
processorID,
func(inputs []execinfra.RowSource) (execinfra.RowSource, error) {
// We provide a slice with a single nil as 'outputs' parameter
// because all processors expect a single output. Passing nil is ok
// here because when wrapping the processor, the materializer will
// be its output, and it will be set up in wrapRowSources.
proc, err := args.ProcessorConstructor(
ctx, flowCtx, spec.ProcessorID, &spec.Core, &spec.Post, inputs,
ctx, flowCtx, processorID, core, post, inputs,
[]execinfra.RowReceiver{nil} /* outputs */, args.LocalProcessors,
)
if err != nil {
Expand All @@ -559,7 +561,7 @@ func (r opResult) createAndWrapRowSource(
)
if rs, ok = proc.(execinfra.RowSource); !ok {
return nil, errors.AssertionFailedf(
"processor %s is not an execinfra.RowSource", spec.Core.String(),
"processor %s is not an execinfra.RowSource", core.String(),
)
}
r.ColumnTypes = rs.OutputTypes()
Expand Down Expand Up @@ -665,19 +667,29 @@ func NewColOperator(
core := &spec.Core
post := &spec.Post

if err = supportedNatively(spec); err != nil {
if err = supportedNatively(core); err != nil {
inputTypes := make([][]*types.T, len(spec.Input))
for inputIdx, input := range spec.Input {
inputTypes[inputIdx] = make([]*types.T, len(input.ColumnTypes))
copy(inputTypes[inputIdx], input.ColumnTypes)
}

err = result.createAndWrapRowSource(ctx, flowCtx, args, inputs, inputTypes, spec, factory, err)
// The wrapped processors need to be passed the post-process specs,
// since they inspect them to figure out information about needed
// columns. This means that we'll let those processors do any renders
// or filters, which isn't ideal. We could improve this.
post = &execinfrapb.PostProcessSpec{}
// We will pass all of the PostProcessSpec to the wrapped processor so
// that it would handle projections as well as limits and offsets
// itself.
//
// However, we'll keep the render exprs for the vectorized planning done
// in planPostProcessSpec() below. In such a setup, the wrapped
// processor will simply output all of its internal columns, so we don't
// need to do anything special to remap the render exprs since they
// still refer to columns using the correct ordinals.
wrappingPost := *post
wrappingPost.RenderExprs = nil
post = &execinfrapb.PostProcessSpec{RenderExprs: post.RenderExprs}
err = result.createAndWrapRowSource(
ctx, flowCtx, args, inputs, inputTypes, core,
&wrappingPost, spec.ProcessorID, factory, err,
)
} else {
switch {
case core.Noop != nil:
Expand Down Expand Up @@ -1460,7 +1472,7 @@ func NewColOperator(
}
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables)
if err != nil {
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err)
} else {
// The result can be updated with the post process result.
result.updateWithPostProcessResult(ppr)
Expand Down Expand Up @@ -1507,7 +1519,7 @@ func NewColOperator(
post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i])
}
}
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, errWrappedCast); err != nil {
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, errWrappedCast); err != nil {
return r, err
}
} else if numMismatchedTypes > 0 {
Expand Down Expand Up @@ -1567,20 +1579,17 @@ func (r opResult) planAndMaybeWrapFilter(
if err != nil {
// Filter expression planning failed. Fall back to planning the filter
// using row execution.
filtererSpec := &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{
Filterer: &execinfrapb.FiltererSpec{
Filter: filter,
},
filtererCore := &execinfrapb.ProcessorCoreUnion{
Filterer: &execinfrapb.FiltererSpec{
Filter: filter,
},
ProcessorID: processorID,
ResultTypes: args.Spec.ResultTypes,
}
inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root}
takeOverMetaInfo(&inputToMaterializer, args.Inputs)
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
[][]*types.T{r.ColumnTypes}, filtererSpec, factory, err,
[][]*types.T{r.ColumnTypes}, filtererCore, &execinfrapb.PostProcessSpec{},
processorID, factory, err,
)
}
r.Root = op
Expand All @@ -1597,23 +1606,19 @@ func (r opResult) wrapPostProcessSpec(
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
post *execinfrapb.PostProcessSpec,
resultTypes []*types.T,
processorID int32,
factory coldata.ColumnFactory,
causeToWrap error,
) error {
noopSpec := &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
},
Post: *post,
ResultTypes: resultTypes,
noopCore := &execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
}
inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root}
takeOverMetaInfo(&inputToMaterializer, args.Inputs)
// createAndWrapRowSource updates r.ColumnTypes accordingly.
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
[][]*types.T{r.ColumnTypes}, noopSpec, factory, causeToWrap,
[][]*types.T{r.ColumnTypes}, noopCore, post, processorID, factory, causeToWrap,
)
}

Expand Down
16 changes: 4 additions & 12 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,23 +460,15 @@ func (post *PostProcessSpec) summary() []string {
var res []string
if post.Projection {
outputColumns := "None"
outputCols := post.OutputColumns
if post.OriginalOutputColumns != nil {
outputCols = post.OriginalOutputColumns
}
if len(outputCols) > 0 {
outputColumns = colListStr(outputCols)
if len(post.OutputColumns) > 0 {
outputColumns = colListStr(post.OutputColumns)
}
res = append(res, fmt.Sprintf("Out: %s", outputColumns))
}
renderExprs := post.RenderExprs
if post.OriginalRenderExprs != nil {
renderExprs = post.OriginalRenderExprs
}
if len(renderExprs) > 0 {
if len(post.RenderExprs) > 0 {
var buf bytes.Buffer
buf.WriteString("Render: ")
for i, expr := range renderExprs {
for i, expr := range post.RenderExprs {
if i > 0 {
buf.WriteString(", ")
}
Expand Down
Loading

0 comments on commit a7a10ff

Please sign in to comment.