Skip to content

Commit

Permalink
colbuilder: vectorizing rendering on top of wrapped processors
Browse files Browse the repository at this point in the history
Previously, whenever we needed to wrap a row-by-row processor into the
vectorized flow, we would pass in the whole `PostProcessSpec` to that
processor. When this was originally implemented several years ago, this
was needed so that processors could determine the set of "needed
columns" (which should be decoded from KV responses). However, recently
we refactored that, and now the set of needed columns is passed via the
`IndexFetchSpec`. This means that we no longer need to pass the whole
`PostProcessSpec` when wrapping, and this commit takes advantage of that
observation in order to vectorize the evaluation of render expressions.
In particular, vectorizing of render expressions allows us to plan more
efficient vectorized builtins. We still pass all other parts of the
`PostProcessSpec` (meaning that the wrapped processor is still
responsible for projections, limits, and offsets) since those operations
will limit the number of datums that need to be converted to the
columnar in-memory format as well as provide limit hints to the wrapped
processors.

Release note: None
  • Loading branch information
yuzefovich committed Jul 1, 2022
1 parent 023a057 commit f5acd81
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 57 deletions.
63 changes: 34 additions & 29 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,17 @@ func (r opResult) createAndWrapRowSource(
args *colexecargs.NewColOperatorArgs,
inputs []colexecargs.OpWithMetaInfo,
inputTypes [][]*types.T,
spec *execinfrapb.ProcessorSpec,
core *execinfrapb.ProcessorCoreUnion,
post *execinfrapb.PostProcessSpec,
processorID int32,
factory coldata.ColumnFactory,
causeToWrap error,
) error {
if args.ProcessorConstructor == nil {
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, &spec.Core); err != nil {
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, core); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
Expand All @@ -540,14 +542,14 @@ func (r opResult) createAndWrapRowSource(
inputs,
inputTypes,
args.StreamingMemAccount,
spec.ProcessorID,
processorID,
func(inputs []execinfra.RowSource) (execinfra.RowSource, error) {
// We provide a slice with a single nil as 'outputs' parameter
// because all processors expect a single output. Passing nil is ok
// here because when wrapping the processor, the materializer will
// be its output, and it will be set up in wrapRowSources.
proc, err := args.ProcessorConstructor(
ctx, flowCtx, spec.ProcessorID, &spec.Core, &spec.Post, inputs,
ctx, flowCtx, processorID, core, post, inputs,
[]execinfra.RowReceiver{nil} /* outputs */, args.LocalProcessors,
)
if err != nil {
Expand All @@ -559,7 +561,7 @@ func (r opResult) createAndWrapRowSource(
)
if rs, ok = proc.(execinfra.RowSource); !ok {
return nil, errors.AssertionFailedf(
"processor %s is not an execinfra.RowSource", spec.Core.String(),
"processor %s is not an execinfra.RowSource", core.String(),
)
}
r.ColumnTypes = rs.OutputTypes()
Expand Down Expand Up @@ -672,12 +674,22 @@ func NewColOperator(
copy(inputTypes[inputIdx], input.ColumnTypes)
}

err = result.createAndWrapRowSource(ctx, flowCtx, args, inputs, inputTypes, spec, factory, err)
// The wrapped processors need to be passed the post-process specs,
// since they inspect them to figure out information about needed
// columns. This means that we'll let those processors do any renders
// or filters, which isn't ideal. We could improve this.
post = &execinfrapb.PostProcessSpec{}
// We will pass all of the PostProcessSpec to the wrapped processor so
// that it would handle projections as well as limits and offsets
// itself.
//
// However, we'll keep the render exprs for the vectorized planning done
// in planPostProcessSpec() below. In such a setup, the wrapped
// processor will simply output all of its internal columns, so we don't
// need to do anything special to remap the render exprs since they
// still refer to columns using the correct ordinals.
wrappingPost := *post
wrappingPost.RenderExprs = nil
post = &execinfrapb.PostProcessSpec{RenderExprs: post.RenderExprs}
err = result.createAndWrapRowSource(
ctx, flowCtx, args, inputs, inputTypes, core,
&wrappingPost, spec.ProcessorID, factory, err,
)
} else {
switch {
case core.Noop != nil:
Expand Down Expand Up @@ -1460,7 +1472,7 @@ func NewColOperator(
}
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables)
if err != nil {
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err)
} else {
// The result can be updated with the post process result.
result.updateWithPostProcessResult(ppr)
Expand Down Expand Up @@ -1507,7 +1519,7 @@ func NewColOperator(
post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i])
}
}
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, errWrappedCast); err != nil {
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, errWrappedCast); err != nil {
return r, err
}
} else if numMismatchedTypes > 0 {
Expand Down Expand Up @@ -1567,20 +1579,17 @@ func (r opResult) planAndMaybeWrapFilter(
if err != nil {
// Filter expression planning failed. Fall back to planning the filter
// using row execution.
filtererSpec := &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{
Filterer: &execinfrapb.FiltererSpec{
Filter: filter,
},
filtererCore := &execinfrapb.ProcessorCoreUnion{
Filterer: &execinfrapb.FiltererSpec{
Filter: filter,
},
ProcessorID: processorID,
ResultTypes: args.Spec.ResultTypes,
}
inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root}
takeOverMetaInfo(&inputToMaterializer, args.Inputs)
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
[][]*types.T{r.ColumnTypes}, filtererSpec, factory, err,
[][]*types.T{r.ColumnTypes}, filtererCore, &execinfrapb.PostProcessSpec{},
processorID, factory, err,
)
}
r.Root = op
Expand All @@ -1597,23 +1606,19 @@ func (r opResult) wrapPostProcessSpec(
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
post *execinfrapb.PostProcessSpec,
resultTypes []*types.T,
processorID int32,
factory coldata.ColumnFactory,
causeToWrap error,
) error {
noopSpec := &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
},
Post: *post,
ResultTypes: resultTypes,
noopCore := &execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
}
inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root}
takeOverMetaInfo(&inputToMaterializer, args.Inputs)
// createAndWrapRowSource updates r.ColumnTypes accordingly.
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
[][]*types.T{r.ColumnTypes}, noopSpec, factory, causeToWrap,
[][]*types.T{r.ColumnTypes}, noopCore, post, processorID, factory, causeToWrap,
)
}

Expand Down
75 changes: 50 additions & 25 deletions pkg/sql/opt/exec/execbuilder/testdata/tpch_vec
Original file line number Diff line number Diff line change
Expand Up @@ -20579,12 +20579,14 @@ EXPLAIN (VEC) SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS reve
└ Node 1
└ *colexec.topKSorter
└ *colexec.hashAggregator
└ *rowexec.joinReader
└ *colexecjoin.hashJoiner
├ *colexecsel.selLTInt64Int64ConstOp
│ └ *colfetcher.ColBatchScan
└ *colexecsel.selEQBytesBytesConstOp
└ *colfetcher.ColBatchScan
└ *colexecproj.projMultFloat64Float64Op
└ *colexecprojconst.projMinusFloat64ConstFloat64Op
└ *rowexec.joinReader
└ *colexecjoin.hashJoiner
├ *colexecsel.selLTInt64Int64ConstOp
│ └ *colfetcher.ColBatchScan
└ *colexecsel.selEQBytesBytesConstOp
└ *colfetcher.ColBatchScan

# Query 4
query T
Expand Down Expand Up @@ -20759,11 +20761,13 @@ EXPLAIN (VEC) SELECT ps_partkey, sum(ps_supplycost * ps_availqty::float) AS valu
└ *colexecbase.castOpNullAny
└ *colexecbase.constNullOp
└ *colexec.hashAggregator
└ *rowexec.joinReader
└ *rowexec.joinReader
└ *colexecproj.projMultFloat64Float64Op
└ *colexecbase.castIntFloatOp
└ *rowexec.joinReader
└ *colexecsel.selEQBytesBytesConstOp
└ *colfetcher.ColBatchScan
└ *rowexec.joinReader
└ *rowexec.joinReader
└ *colexecsel.selEQBytesBytesConstOp
└ *colfetcher.ColBatchScan

# Query 12
query T
Expand All @@ -20773,12 +20777,30 @@ EXPLAIN (VEC) SELECT l_shipmode, sum(CASE WHEN o_orderpriority = '1-URGENT' or o
└ Node 1
└ *colexec.sortOp
└ *colexec.hashAggregator
└ *rowexec.joinReader
└ *colexecsel.selLTInt64Int64Op
└ *colexecsel.selLTInt64Int64Op
└ *colexec.selectInOpBytes
└ *colfetcher.ColIndexJoin
└ *colfetcher.ColBatchScan
└ *colexec.caseOp
├ *colexec.bufferOp
│ └ *colexec.caseOp
│ ├ *colexec.bufferOp
│ │ └ *rowexec.joinReader
│ │ └ *colexecsel.selLTInt64Int64Op
│ │ └ *colexecsel.selLTInt64Int64Op
│ │ └ *colexec.selectInOpBytes
│ │ └ *colfetcher.ColIndexJoin
│ │ └ *colfetcher.ColBatchScan
│ ├ *colexecbase.constInt64Op
│ │ └ *colexec.orProjOp
│ │ ├ *colexec.bufferOp
│ │ ├ *colexecprojconst.projEQBytesBytesConstOp
│ │ └ *colexecprojconst.projEQBytesBytesConstOp
│ └ *colexecbase.constInt64Op
│ └ *colexec.bufferOp
├ *colexecbase.constInt64Op
│ └ *colexec.andProjOp
│ ├ *colexec.bufferOp
│ ├ *colexecprojconst.projNEBytesBytesConstOp
│ └ *colexecprojconst.projNEBytesBytesConstOp
└ *colexecbase.constInt64Op
└ *colexec.bufferOp

# Query 13
query T
Expand Down Expand Up @@ -20993,12 +21015,15 @@ EXPLAIN (VEC) SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctba
└ Node 1
└ *colexec.sortOp
└ *colexec.hashAggregator
└ *rowexec.joinReader
└ *colexecsel.selGTFloat64Float64Op
└ *colexecbase.castOpNullAny
└ *colexecbase.constNullOp
└ *colexec.selectInOpBytes
└ *colexec.substringInt64Int64Operator
└ *colexecbase.constInt64Op
└ *colexecbase.constInt64Op
└ *colfetcher.ColBatchScan
└ *colexec.substringInt64Int64Operator
└ *colexecbase.constInt64Op
└ *colexecbase.constInt64Op
└ *rowexec.joinReader
└ *colexecsel.selGTFloat64Float64Op
└ *colexecbase.castOpNullAny
└ *colexecbase.constNullOp
└ *colexec.selectInOpBytes
└ *colexec.substringInt64Int64Operator
└ *colexecbase.constInt64Op
└ *colexecbase.constInt64Op
└ *colfetcher.ColBatchScan
10 changes: 7 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/vectorize_local
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,13 @@ EXPLAIN (VEC)
----
└ Node 1
└ *sql.planNodeToRowSource
└ *colfetcher.ColIndexJoin
└ *colfetcher.ColBatchScan
└ *colexec.caseOp
├ *colexec.bufferOp
│ └ *sql.planNodeToRowSource
│ └ *colfetcher.ColIndexJoin
│ └ *colfetcher.ColBatchScan
├ *colexec.bufferOp
└ *colexec.bufferOp

# Regression test for releasing operators before closing them with EXPLAIN (VEC)
# (#70438).
Expand Down

0 comments on commit f5acd81

Please sign in to comment.