diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 41e2b6e0d9f5..b28c1504bed3 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -508,7 +508,9 @@ func (r opResult) createAndWrapRowSource( args *colexecargs.NewColOperatorArgs, inputs []colexecargs.OpWithMetaInfo, inputTypes [][]*types.T, - spec *execinfrapb.ProcessorSpec, + core *execinfrapb.ProcessorCoreUnion, + post *execinfrapb.PostProcessSpec, + processorID int32, factory coldata.ColumnFactory, causeToWrap error, ) error { @@ -516,7 +518,7 @@ func (r opResult) createAndWrapRowSource( return errors.New("processorConstructor is nil") } log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap) - if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, &spec.Core); err != nil { + if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, core); err != nil { log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err) // Return the original error for why we don't support this spec // natively since it is more interesting. @@ -540,14 +542,14 @@ func (r opResult) createAndWrapRowSource( inputs, inputTypes, args.StreamingMemAccount, - spec.ProcessorID, + processorID, func(inputs []execinfra.RowSource) (execinfra.RowSource, error) { // We provide a slice with a single nil as 'outputs' parameter // because all processors expect a single output. Passing nil is ok // here because when wrapping the processor, the materializer will // be its output, and it will be set up in wrapRowSources. proc, err := args.ProcessorConstructor( - ctx, flowCtx, spec.ProcessorID, &spec.Core, &spec.Post, inputs, + ctx, flowCtx, processorID, core, post, inputs, []execinfra.RowReceiver{nil} /* outputs */, args.LocalProcessors, ) if err != nil { @@ -559,7 +561,7 @@ func (r opResult) createAndWrapRowSource( ) if rs, ok = proc.(execinfra.RowSource); !ok { return nil, errors.AssertionFailedf( - "processor %s is not an execinfra.RowSource", spec.Core.String(), + "processor %s is not an execinfra.RowSource", core.String(), ) } r.ColumnTypes = rs.OutputTypes() @@ -672,12 +674,22 @@ func NewColOperator( copy(inputTypes[inputIdx], input.ColumnTypes) } - err = result.createAndWrapRowSource(ctx, flowCtx, args, inputs, inputTypes, spec, factory, err) - // The wrapped processors need to be passed the post-process specs, - // since they inspect them to figure out information about needed - // columns. This means that we'll let those processors do any renders - // or filters, which isn't ideal. We could improve this. - post = &execinfrapb.PostProcessSpec{} + // We will pass all of the PostProcessSpec to the wrapped processor so + // that it would handle projections as well as limits and offsets + // itself. + // + // However, we'll keep the render exprs for the vectorized planning done + // in planPostProcessSpec() below. In such a setup, the wrapped + // processor will simply output all of its internal columns, so we don't + // need to do anything special to remap the render exprs since they + // still refer to columns using the correct ordinals. + wrappingPost := *post + wrappingPost.RenderExprs = nil + post = &execinfrapb.PostProcessSpec{RenderExprs: post.RenderExprs} + err = result.createAndWrapRowSource( + ctx, flowCtx, args, inputs, inputTypes, core, + &wrappingPost, spec.ProcessorID, factory, err, + ) } else { switch { case core.Noop != nil: @@ -1460,7 +1472,7 @@ func NewColOperator( } err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables) if err != nil { - err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err) + err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err) } else { // The result can be updated with the post process result. result.updateWithPostProcessResult(ppr) @@ -1507,7 +1519,7 @@ func NewColOperator( post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i]) } } - if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, errWrappedCast); err != nil { + if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, errWrappedCast); err != nil { return r, err } } else if numMismatchedTypes > 0 { @@ -1567,20 +1579,17 @@ func (r opResult) planAndMaybeWrapFilter( if err != nil { // Filter expression planning failed. Fall back to planning the filter // using row execution. - filtererSpec := &execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{ - Filterer: &execinfrapb.FiltererSpec{ - Filter: filter, - }, + filtererCore := &execinfrapb.ProcessorCoreUnion{ + Filterer: &execinfrapb.FiltererSpec{ + Filter: filter, }, - ProcessorID: processorID, - ResultTypes: args.Spec.ResultTypes, } inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root} takeOverMetaInfo(&inputToMaterializer, args.Inputs) return r.createAndWrapRowSource( ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer}, - [][]*types.T{r.ColumnTypes}, filtererSpec, factory, err, + [][]*types.T{r.ColumnTypes}, filtererCore, &execinfrapb.PostProcessSpec{}, + processorID, factory, err, ) } r.Root = op @@ -1597,23 +1606,19 @@ func (r opResult) wrapPostProcessSpec( flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, post *execinfrapb.PostProcessSpec, - resultTypes []*types.T, + processorID int32, factory coldata.ColumnFactory, causeToWrap error, ) error { - noopSpec := &execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{ - Noop: &execinfrapb.NoopCoreSpec{}, - }, - Post: *post, - ResultTypes: resultTypes, + noopCore := &execinfrapb.ProcessorCoreUnion{ + Noop: &execinfrapb.NoopCoreSpec{}, } inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root} takeOverMetaInfo(&inputToMaterializer, args.Inputs) // createAndWrapRowSource updates r.ColumnTypes accordingly. return r.createAndWrapRowSource( ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer}, - [][]*types.T{r.ColumnTypes}, noopSpec, factory, causeToWrap, + [][]*types.T{r.ColumnTypes}, noopCore, post, processorID, factory, causeToWrap, ) } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec b/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec index d293c0dc8aff..b2d1699c8400 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec +++ b/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec @@ -20579,12 +20579,14 @@ EXPLAIN (VEC) SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS reve └ Node 1 └ *colexec.topKSorter └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecjoin.hashJoiner - ├ *colexecsel.selLTInt64Int64ConstOp - │ └ *colfetcher.ColBatchScan - └ *colexecsel.selEQBytesBytesConstOp - └ *colfetcher.ColBatchScan + └ *colexecproj.projMultFloat64Float64Op + └ *colexecprojconst.projMinusFloat64ConstFloat64Op + └ *rowexec.joinReader + └ *colexecjoin.hashJoiner + ├ *colexecsel.selLTInt64Int64ConstOp + │ └ *colfetcher.ColBatchScan + └ *colexecsel.selEQBytesBytesConstOp + └ *colfetcher.ColBatchScan # Query 4 query T @@ -20759,11 +20761,13 @@ EXPLAIN (VEC) SELECT ps_partkey, sum(ps_supplycost * ps_availqty::float) AS valu └ *colexecbase.castOpNullAny └ *colexecbase.constNullOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *rowexec.joinReader + └ *colexecproj.projMultFloat64Float64Op + └ *colexecbase.castIntFloatOp └ *rowexec.joinReader - └ *colexecsel.selEQBytesBytesConstOp - └ *colfetcher.ColBatchScan + └ *rowexec.joinReader + └ *rowexec.joinReader + └ *colexecsel.selEQBytesBytesConstOp + └ *colfetcher.ColBatchScan # Query 12 query T @@ -20773,12 +20777,30 @@ EXPLAIN (VEC) SELECT l_shipmode, sum(CASE WHEN o_orderpriority = '1-URGENT' or o └ Node 1 └ *colexec.sortOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecsel.selLTInt64Int64Op - └ *colexecsel.selLTInt64Int64Op - └ *colexec.selectInOpBytes - └ *colfetcher.ColIndexJoin - └ *colfetcher.ColBatchScan + └ *colexec.caseOp + ├ *colexec.bufferOp + │ └ *colexec.caseOp + │ ├ *colexec.bufferOp + │ │ └ *rowexec.joinReader + │ │ └ *colexecsel.selLTInt64Int64Op + │ │ └ *colexecsel.selLTInt64Int64Op + │ │ └ *colexec.selectInOpBytes + │ │ └ *colfetcher.ColIndexJoin + │ │ └ *colfetcher.ColBatchScan + │ ├ *colexecbase.constInt64Op + │ │ └ *colexec.orProjOp + │ │ ├ *colexec.bufferOp + │ │ ├ *colexecprojconst.projEQBytesBytesConstOp + │ │ └ *colexecprojconst.projEQBytesBytesConstOp + │ └ *colexecbase.constInt64Op + │ └ *colexec.bufferOp + ├ *colexecbase.constInt64Op + │ └ *colexec.andProjOp + │ ├ *colexec.bufferOp + │ ├ *colexecprojconst.projNEBytesBytesConstOp + │ └ *colexecprojconst.projNEBytesBytesConstOp + └ *colexecbase.constInt64Op + └ *colexec.bufferOp # Query 13 query T @@ -20993,12 +21015,15 @@ EXPLAIN (VEC) SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctba └ Node 1 └ *colexec.sortOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecsel.selGTFloat64Float64Op - └ *colexecbase.castOpNullAny - └ *colexecbase.constNullOp - └ *colexec.selectInOpBytes - └ *colexec.substringInt64Int64Operator - └ *colexecbase.constInt64Op - └ *colexecbase.constInt64Op - └ *colfetcher.ColBatchScan + └ *colexec.substringInt64Int64Operator + └ *colexecbase.constInt64Op + └ *colexecbase.constInt64Op + └ *rowexec.joinReader + └ *colexecsel.selGTFloat64Float64Op + └ *colexecbase.castOpNullAny + └ *colexecbase.constNullOp + └ *colexec.selectInOpBytes + └ *colexec.substringInt64Int64Operator + └ *colexecbase.constInt64Op + └ *colexecbase.constInt64Op + └ *colfetcher.ColBatchScan diff --git a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local index 3f61148a96e2..3b4b3b9d72c6 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local +++ b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local @@ -447,9 +447,13 @@ EXPLAIN (VEC) ---- │ └ Node 1 - └ *sql.planNodeToRowSource - └ *colfetcher.ColIndexJoin - └ *colfetcher.ColBatchScan + └ *colexec.caseOp + ├ *colexec.bufferOp + │ └ *sql.planNodeToRowSource + │ └ *colfetcher.ColIndexJoin + │ └ *colfetcher.ColBatchScan + ├ *colexec.bufferOp + └ *colexec.bufferOp # Regression test for releasing operators before closing them with EXPLAIN (VEC) # (#70438).