diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 7a9d30ba4233..b28c1504bed3 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -153,9 +153,9 @@ func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) { // vectorized engine (neither natively nor by wrapping the corresponding row // execution processor). func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error { - err := supportedNatively(spec) + err := supportedNatively(&spec.Core) if err != nil { - if wrapErr := canWrap(mode, spec); wrapErr == nil { + if wrapErr := canWrap(mode, &spec.Core); wrapErr == nil { // We don't support this spec natively, but we can wrap the row // execution processor. return nil @@ -165,59 +165,59 @@ func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.Process } // supportedNatively checks whether we have a columnar operator equivalent to a -// processor described by spec. Note that it doesn't perform any other checks +// processor described by core. Note that it doesn't perform any other checks // (like validity of the number of inputs). -func supportedNatively(spec *execinfrapb.ProcessorSpec) error { +func supportedNatively(core *execinfrapb.ProcessorCoreUnion) error { switch { - case spec.Core.Noop != nil: + case core.Noop != nil: return nil - case spec.Core.Values != nil: + case core.Values != nil: return nil - case spec.Core.TableReader != nil: + case core.TableReader != nil: return nil - case spec.Core.JoinReader != nil: - if !spec.Core.JoinReader.IsIndexJoin() { + case core.JoinReader != nil: + if !core.JoinReader.IsIndexJoin() { return errLookupJoinUnsupported } return nil - case spec.Core.Filterer != nil: + case core.Filterer != nil: return nil - case spec.Core.Aggregator != nil: - for _, agg := range spec.Core.Aggregator.Aggregations { + case core.Aggregator != nil: + for _, agg := range core.Aggregator.Aggregations { if agg.FilterColIdx != nil { return errors.Newf("filtering aggregation not supported") } } return nil - case spec.Core.Distinct != nil: + case core.Distinct != nil: return nil - case spec.Core.Ordinality != nil: + case core.Ordinality != nil: return nil - case spec.Core.HashJoiner != nil: - if !spec.Core.HashJoiner.OnExpr.Empty() && spec.Core.HashJoiner.Type != descpb.InnerJoin { + case core.HashJoiner != nil: + if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type != descpb.InnerJoin { return errors.Newf("can't plan vectorized non-inner hash joins with ON expressions") } return nil - case spec.Core.MergeJoiner != nil: - if !spec.Core.MergeJoiner.OnExpr.Empty() && spec.Core.MergeJoiner.Type != descpb.InnerJoin { + case core.MergeJoiner != nil: + if !core.MergeJoiner.OnExpr.Empty() && core.MergeJoiner.Type != descpb.InnerJoin { return errors.Errorf("can't plan non-inner merge join with ON expressions") } return nil - case spec.Core.Sorter != nil: + case core.Sorter != nil: return nil - case spec.Core.Windower != nil: - for _, wf := range spec.Core.Windower.WindowFns { + case core.Windower != nil: + for _, wf := range core.Windower.WindowFns { if wf.FilterColIdx != tree.NoColumnIdx { return errors.Newf("window functions with FILTER clause are not supported") } @@ -229,7 +229,7 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error { } return nil - case spec.Core.LocalPlanNode != nil: + case core.LocalPlanNode != nil: // LocalPlanNode core is special (we don't have any plans on vectorizing // it at the moment), so we want to return a custom error for it to // distinguish from other unsupported cores. @@ -256,55 +256,55 @@ var ( errLookupJoinUnsupported = errors.New("lookup join reader is unsupported in vectorized") ) -func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error { - if mode == sessiondatapb.VectorizeExperimentalAlways && spec.Core.JoinReader == nil && spec.Core.LocalPlanNode == nil { +func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCoreUnion) error { + if mode == sessiondatapb.VectorizeExperimentalAlways && core.JoinReader == nil && core.LocalPlanNode == nil { return errExperimentalWrappingProhibited } switch { - case spec.Core.Noop != nil: - case spec.Core.TableReader != nil: - case spec.Core.JoinReader != nil: - case spec.Core.Sorter != nil: - case spec.Core.Aggregator != nil: - case spec.Core.Distinct != nil: - case spec.Core.MergeJoiner != nil: - case spec.Core.HashJoiner != nil: - case spec.Core.Values != nil: - case spec.Core.Backfiller != nil: + case core.Noop != nil: + case core.TableReader != nil: + case core.JoinReader != nil: + case core.Sorter != nil: + case core.Aggregator != nil: + case core.Distinct != nil: + case core.MergeJoiner != nil: + case core.HashJoiner != nil: + case core.Values != nil: + case core.Backfiller != nil: return errBackfillerWrap - case spec.Core.ReadImport != nil: + case core.ReadImport != nil: return errReadImportWrap - case spec.Core.Exporter != nil: + case core.Exporter != nil: return errExporterWrap - case spec.Core.Sampler != nil: + case core.Sampler != nil: return errSamplerWrap - case spec.Core.SampleAggregator != nil: + case core.SampleAggregator != nil: return errSampleAggregatorWrap - case spec.Core.ZigzagJoiner != nil: - case spec.Core.ProjectSet != nil: - case spec.Core.Windower != nil: - case spec.Core.LocalPlanNode != nil: - case spec.Core.ChangeAggregator != nil: + case core.ZigzagJoiner != nil: + case core.ProjectSet != nil: + case core.Windower != nil: + case core.LocalPlanNode != nil: + case core.ChangeAggregator != nil: // Currently, there is an issue with cleaning up the changefeed flows // (#55408), so we fallback to the row-by-row engine. return errChangeAggregatorWrap - case spec.Core.ChangeFrontier != nil: + case core.ChangeFrontier != nil: // Currently, there is an issue with cleaning up the changefeed flows // (#55408), so we fallback to the row-by-row engine. return errChangeFrontierWrap - case spec.Core.Ordinality != nil: - case spec.Core.BulkRowWriter != nil: - case spec.Core.InvertedFilterer != nil: - case spec.Core.InvertedJoiner != nil: - case spec.Core.BackupData != nil: + case core.Ordinality != nil: + case core.BulkRowWriter != nil: + case core.InvertedFilterer != nil: + case core.InvertedJoiner != nil: + case core.BackupData != nil: return errBackupDataWrap - case spec.Core.SplitAndScatter != nil: - case spec.Core.RestoreData != nil: - case spec.Core.Filterer != nil: - case spec.Core.StreamIngestionData != nil: - case spec.Core.StreamIngestionFrontier != nil: + case core.SplitAndScatter != nil: + case core.RestoreData != nil: + case core.Filterer != nil: + case core.StreamIngestionData != nil: + case core.StreamIngestionFrontier != nil: default: - return errors.AssertionFailedf("unexpected processor core %q", spec.Core) + return errors.AssertionFailedf("unexpected processor core %q", core) } return nil } @@ -508,7 +508,9 @@ func (r opResult) createAndWrapRowSource( args *colexecargs.NewColOperatorArgs, inputs []colexecargs.OpWithMetaInfo, inputTypes [][]*types.T, - spec *execinfrapb.ProcessorSpec, + core *execinfrapb.ProcessorCoreUnion, + post *execinfrapb.PostProcessSpec, + processorID int32, factory coldata.ColumnFactory, causeToWrap error, ) error { @@ -516,7 +518,7 @@ func (r opResult) createAndWrapRowSource( return errors.New("processorConstructor is nil") } log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap) - if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, spec); err != nil { + if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, core); err != nil { log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err) // Return the original error for why we don't support this spec // natively since it is more interesting. @@ -540,14 +542,14 @@ func (r opResult) createAndWrapRowSource( inputs, inputTypes, args.StreamingMemAccount, - spec.ProcessorID, + processorID, func(inputs []execinfra.RowSource) (execinfra.RowSource, error) { // We provide a slice with a single nil as 'outputs' parameter // because all processors expect a single output. Passing nil is ok // here because when wrapping the processor, the materializer will // be its output, and it will be set up in wrapRowSources. proc, err := args.ProcessorConstructor( - ctx, flowCtx, spec.ProcessorID, &spec.Core, &spec.Post, inputs, + ctx, flowCtx, processorID, core, post, inputs, []execinfra.RowReceiver{nil} /* outputs */, args.LocalProcessors, ) if err != nil { @@ -559,7 +561,7 @@ func (r opResult) createAndWrapRowSource( ) if rs, ok = proc.(execinfra.RowSource); !ok { return nil, errors.AssertionFailedf( - "processor %s is not an execinfra.RowSource", spec.Core.String(), + "processor %s is not an execinfra.RowSource", core.String(), ) } r.ColumnTypes = rs.OutputTypes() @@ -665,19 +667,29 @@ func NewColOperator( core := &spec.Core post := &spec.Post - if err = supportedNatively(spec); err != nil { + if err = supportedNatively(core); err != nil { inputTypes := make([][]*types.T, len(spec.Input)) for inputIdx, input := range spec.Input { inputTypes[inputIdx] = make([]*types.T, len(input.ColumnTypes)) copy(inputTypes[inputIdx], input.ColumnTypes) } - err = result.createAndWrapRowSource(ctx, flowCtx, args, inputs, inputTypes, spec, factory, err) - // The wrapped processors need to be passed the post-process specs, - // since they inspect them to figure out information about needed - // columns. This means that we'll let those processors do any renders - // or filters, which isn't ideal. We could improve this. - post = &execinfrapb.PostProcessSpec{} + // We will pass all of the PostProcessSpec to the wrapped processor so + // that it would handle projections as well as limits and offsets + // itself. + // + // However, we'll keep the render exprs for the vectorized planning done + // in planPostProcessSpec() below. In such a setup, the wrapped + // processor will simply output all of its internal columns, so we don't + // need to do anything special to remap the render exprs since they + // still refer to columns using the correct ordinals. + wrappingPost := *post + wrappingPost.RenderExprs = nil + post = &execinfrapb.PostProcessSpec{RenderExprs: post.RenderExprs} + err = result.createAndWrapRowSource( + ctx, flowCtx, args, inputs, inputTypes, core, + &wrappingPost, spec.ProcessorID, factory, err, + ) } else { switch { case core.Noop != nil: @@ -1460,7 +1472,7 @@ func NewColOperator( } err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables) if err != nil { - err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err) + err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err) } else { // The result can be updated with the post process result. result.updateWithPostProcessResult(ppr) @@ -1507,7 +1519,7 @@ func NewColOperator( post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i]) } } - if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, errWrappedCast); err != nil { + if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, errWrappedCast); err != nil { return r, err } } else if numMismatchedTypes > 0 { @@ -1567,20 +1579,17 @@ func (r opResult) planAndMaybeWrapFilter( if err != nil { // Filter expression planning failed. Fall back to planning the filter // using row execution. - filtererSpec := &execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{ - Filterer: &execinfrapb.FiltererSpec{ - Filter: filter, - }, + filtererCore := &execinfrapb.ProcessorCoreUnion{ + Filterer: &execinfrapb.FiltererSpec{ + Filter: filter, }, - ProcessorID: processorID, - ResultTypes: args.Spec.ResultTypes, } inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root} takeOverMetaInfo(&inputToMaterializer, args.Inputs) return r.createAndWrapRowSource( ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer}, - [][]*types.T{r.ColumnTypes}, filtererSpec, factory, err, + [][]*types.T{r.ColumnTypes}, filtererCore, &execinfrapb.PostProcessSpec{}, + processorID, factory, err, ) } r.Root = op @@ -1597,23 +1606,19 @@ func (r opResult) wrapPostProcessSpec( flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, post *execinfrapb.PostProcessSpec, - resultTypes []*types.T, + processorID int32, factory coldata.ColumnFactory, causeToWrap error, ) error { - noopSpec := &execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{ - Noop: &execinfrapb.NoopCoreSpec{}, - }, - Post: *post, - ResultTypes: resultTypes, + noopCore := &execinfrapb.ProcessorCoreUnion{ + Noop: &execinfrapb.NoopCoreSpec{}, } inputToMaterializer := colexecargs.OpWithMetaInfo{Root: r.Root} takeOverMetaInfo(&inputToMaterializer, args.Inputs) // createAndWrapRowSource updates r.ColumnTypes accordingly. return r.createAndWrapRowSource( ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer}, - [][]*types.T{r.ColumnTypes}, noopSpec, factory, causeToWrap, + [][]*types.T{r.ColumnTypes}, noopCore, post, processorID, factory, causeToWrap, ) } diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index b99f0697fab8..35e720e93955 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -460,23 +460,15 @@ func (post *PostProcessSpec) summary() []string { var res []string if post.Projection { outputColumns := "None" - outputCols := post.OutputColumns - if post.OriginalOutputColumns != nil { - outputCols = post.OriginalOutputColumns - } - if len(outputCols) > 0 { - outputColumns = colListStr(outputCols) + if len(post.OutputColumns) > 0 { + outputColumns = colListStr(post.OutputColumns) } res = append(res, fmt.Sprintf("Out: %s", outputColumns)) } - renderExprs := post.RenderExprs - if post.OriginalRenderExprs != nil { - renderExprs = post.OriginalRenderExprs - } - if len(renderExprs) > 0 { + if len(post.RenderExprs) > 0 { var buf bytes.Buffer buf.WriteString("Render: ") - for i, expr := range renderExprs { + for i, expr := range post.RenderExprs { if i > 0 { buf.WriteString(", ") } diff --git a/pkg/sql/execinfrapb/processors_base.proto b/pkg/sql/execinfrapb/processors_base.proto index fca201c46a46..7ee0bbedbe5f 100644 --- a/pkg/sql/execinfrapb/processors_base.proto +++ b/pkg/sql/execinfrapb/processors_base.proto @@ -38,22 +38,12 @@ message PostProcessSpec { // Can only be set if projection is true. Cannot be set at the same time with // render expressions. repeated uint32 output_columns = 3 [packed = true]; - // OriginalOutputColumns will be set if OutputColumns are destructively - // modified during the vectorized flow setup. This field is only used for - // population of the DistSQL diagrams, and if set, it takes precedence over - // OutputColumns. - repeated uint32 original_output_columns = 7 [packed = true]; // If set, the output is the result of rendering these expressions. The // expressions reference the internal columns of the processor. // // Cannot be set at the same time with output columns. repeated Expression render_exprs = 4 [(gogoproto.nullable) = false]; - // OriginalRenderExprs will be set if RenderExprs are destructively - // modified during the vectorized flow setup. This field is only used for - // population of the DistSQL diagrams, and if set, it takes precedence over - // RenderExprs. - repeated Expression original_render_exprs = 8 [(gogoproto.nullable) = false]; // If nonzero, the first rows will be suppressed. optional uint64 offset = 5 [(gogoproto.nullable) = false]; @@ -62,7 +52,7 @@ message PostProcessSpec { // suppressed by , if any, do not count towards this limit. optional uint64 limit = 6 [(gogoproto.nullable) = false]; - reserved 1; + reserved 1, 7, 8; } message Columns { diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 6591c468208e..03bbb8094e71 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -4617,6 +4617,8 @@ func runSQLLiteLogicTest(t *testing.T, configOverride string, globs ...string) { // limit than other logic tests get. serverArgs := TestServerArgs{ maxSQLMemoryLimit: 512 << 20, // 512 MiB + // TODO(yuzefovich): remove this once the flake in #84022 is fixed. + DisableWorkmemRandomization: true, } RunLogicTestWithDefaultConfig(t, serverArgs, configOverride, "", true /* runCCLConfigs */, prefixedGlobs...) } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec b/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec index d293c0dc8aff..b2d1699c8400 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec +++ b/pkg/sql/opt/exec/execbuilder/testdata/tpch_vec @@ -20579,12 +20579,14 @@ EXPLAIN (VEC) SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS reve └ Node 1 └ *colexec.topKSorter └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecjoin.hashJoiner - ├ *colexecsel.selLTInt64Int64ConstOp - │ └ *colfetcher.ColBatchScan - └ *colexecsel.selEQBytesBytesConstOp - └ *colfetcher.ColBatchScan + └ *colexecproj.projMultFloat64Float64Op + └ *colexecprojconst.projMinusFloat64ConstFloat64Op + └ *rowexec.joinReader + └ *colexecjoin.hashJoiner + ├ *colexecsel.selLTInt64Int64ConstOp + │ └ *colfetcher.ColBatchScan + └ *colexecsel.selEQBytesBytesConstOp + └ *colfetcher.ColBatchScan # Query 4 query T @@ -20759,11 +20761,13 @@ EXPLAIN (VEC) SELECT ps_partkey, sum(ps_supplycost * ps_availqty::float) AS valu └ *colexecbase.castOpNullAny └ *colexecbase.constNullOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *rowexec.joinReader + └ *colexecproj.projMultFloat64Float64Op + └ *colexecbase.castIntFloatOp └ *rowexec.joinReader - └ *colexecsel.selEQBytesBytesConstOp - └ *colfetcher.ColBatchScan + └ *rowexec.joinReader + └ *rowexec.joinReader + └ *colexecsel.selEQBytesBytesConstOp + └ *colfetcher.ColBatchScan # Query 12 query T @@ -20773,12 +20777,30 @@ EXPLAIN (VEC) SELECT l_shipmode, sum(CASE WHEN o_orderpriority = '1-URGENT' or o └ Node 1 └ *colexec.sortOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecsel.selLTInt64Int64Op - └ *colexecsel.selLTInt64Int64Op - └ *colexec.selectInOpBytes - └ *colfetcher.ColIndexJoin - └ *colfetcher.ColBatchScan + └ *colexec.caseOp + ├ *colexec.bufferOp + │ └ *colexec.caseOp + │ ├ *colexec.bufferOp + │ │ └ *rowexec.joinReader + │ │ └ *colexecsel.selLTInt64Int64Op + │ │ └ *colexecsel.selLTInt64Int64Op + │ │ └ *colexec.selectInOpBytes + │ │ └ *colfetcher.ColIndexJoin + │ │ └ *colfetcher.ColBatchScan + │ ├ *colexecbase.constInt64Op + │ │ └ *colexec.orProjOp + │ │ ├ *colexec.bufferOp + │ │ ├ *colexecprojconst.projEQBytesBytesConstOp + │ │ └ *colexecprojconst.projEQBytesBytesConstOp + │ └ *colexecbase.constInt64Op + │ └ *colexec.bufferOp + ├ *colexecbase.constInt64Op + │ └ *colexec.andProjOp + │ ├ *colexec.bufferOp + │ ├ *colexecprojconst.projNEBytesBytesConstOp + │ └ *colexecprojconst.projNEBytesBytesConstOp + └ *colexecbase.constInt64Op + └ *colexec.bufferOp # Query 13 query T @@ -20993,12 +21015,15 @@ EXPLAIN (VEC) SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctba └ Node 1 └ *colexec.sortOp └ *colexec.hashAggregator - └ *rowexec.joinReader - └ *colexecsel.selGTFloat64Float64Op - └ *colexecbase.castOpNullAny - └ *colexecbase.constNullOp - └ *colexec.selectInOpBytes - └ *colexec.substringInt64Int64Operator - └ *colexecbase.constInt64Op - └ *colexecbase.constInt64Op - └ *colfetcher.ColBatchScan + └ *colexec.substringInt64Int64Operator + └ *colexecbase.constInt64Op + └ *colexecbase.constInt64Op + └ *rowexec.joinReader + └ *colexecsel.selGTFloat64Float64Op + └ *colexecbase.castOpNullAny + └ *colexecbase.constNullOp + └ *colexec.selectInOpBytes + └ *colexec.substringInt64Int64Operator + └ *colexecbase.constInt64Op + └ *colexecbase.constInt64Op + └ *colfetcher.ColBatchScan diff --git a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local index 3f61148a96e2..3b4b3b9d72c6 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local +++ b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local @@ -447,9 +447,13 @@ EXPLAIN (VEC) ---- │ └ Node 1 - └ *sql.planNodeToRowSource - └ *colfetcher.ColIndexJoin - └ *colfetcher.ColBatchScan + └ *colexec.caseOp + ├ *colexec.bufferOp + │ └ *sql.planNodeToRowSource + │ └ *colfetcher.ColIndexJoin + │ └ *colfetcher.ColBatchScan + ├ *colexec.bufferOp + └ *colexec.bufferOp # Regression test for releasing operators before closing them with EXPLAIN (VEC) # (#70438).