diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index bed494080251..c843c1207b69 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -634,10 +634,6 @@ func makeNewHashJoinerArgs( core *execinfrapb.HashJoinerSpec, factory coldata.ColumnFactory, ) (colexecjoin.NewHashJoinerArgs, redact.RedactableString) { - leftTypes := make([]*types.T, len(args.Spec.Input[0].ColumnTypes)) - copy(leftTypes, args.Spec.Input[0].ColumnTypes) - rightTypes := make([]*types.T, len(args.Spec.Input[1].ColumnTypes)) - copy(rightTypes, args.Spec.Input[1].ColumnTypes) hashJoinerMemAccount, hashJoinerMemMonitorName := args.MonitorRegistry.CreateMemAccountForSpillStrategy( ctx, flowCtx, opName, args.Spec.ProcessorID, ) @@ -650,8 +646,8 @@ func makeNewHashJoinerArgs( core.Type, core.LeftEqColumns, core.RightEqColumns, - leftTypes, - rightTypes, + args.Spec.Input[0].ColumnTypes, + args.Spec.Input[1].ColumnTypes, core.RightEqColumnsAreKey, ) return colexecjoin.NewHashJoinerArgs{ @@ -728,31 +724,6 @@ func makeNewHashAggregatorArgs( hashAggregatorMemMonitorName } -// NOTE: throughout this file we do not append an output type of a projecting -// operator to the passed-in type schema - we, instead, always allocate a new -// type slice and copy over the old schema and set the output column of a -// projecting operator in the next slot. We attempt to enforce this by a linter -// rule, and such behavior prevents the type schema corruption scenario as -// described below. -// -// Without explicit new allocations, it is possible that planSelectionOperators -// (and other planning functions) reuse the same array for filterColumnTypes as -// result.ColumnTypes is using because there was enough capacity to do so. -// As an example, consider the following scenario in the context of -// planFilterExpr method: -// 1. r.ColumnTypes={types.Bool} with len=1 and cap=4 -// 2. planSelectionOperators adds another types.Int column, so -// filterColumnTypes={types.Bool, types.Int} with len=2 and cap=4 -// Crucially, it uses exact same underlying array as r.ColumnTypes -// uses. -// 3. we project out second column, so r.ColumnTypes={types.Bool} -// 4. later, we add another types.Float column, so -// r.ColumnTypes={types.Bool, types.Float}, but there is enough -// capacity in the array, so we simply overwrite the second slot -// with the new type which corrupts filterColumnTypes to become -// {types.Bool, types.Float}, and we can get into a runtime type -// mismatch situation. - // NewColOperator creates a new columnar operator according to the given spec. func NewColOperator( ctx context.Context, flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, @@ -780,8 +751,7 @@ func NewColOperator( if err = supportedNatively(core); err != nil { inputTypes := make([][]*types.T, len(spec.Input)) for inputIdx, input := range spec.Input { - inputTypes[inputIdx] = make([]*types.T, len(input.ColumnTypes)) - copy(inputTypes[inputIdx], input.ColumnTypes) + inputTypes[inputIdx] = input.ColumnTypes } // We will pass all of the PostProcessSpec to the wrapped processor so @@ -811,8 +781,7 @@ func NewColOperator( return r, err } result.Root = colexecop.NewNoop(inputs[0].Root) - result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(result.ColumnTypes, spec.Input[0].ColumnTypes) + result.ColumnTypes = spec.Input[0].ColumnTypes case core.Values != nil: if err := checkNumIn(inputs, 0); err != nil { @@ -962,13 +931,11 @@ func NewColOperator( streamerDiskMonitor := args.MonitorRegistry.CreateDiskMonitor( ctx, flowCtx, "streamer" /* opName */, spec.ProcessorID, ) - inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(inputTypes, spec.Input[0].ColumnTypes) indexJoinOp, err := colfetcher.NewColIndexJoin( ctx, getStreamingAllocator(ctx, args), colmem.NewAllocator(ctx, accounts[0], factory), accounts[1], accounts[2], flowCtx, spec.ProcessorID, - inputs[0].Root, core.JoinReader, post, inputTypes, + inputs[0].Root, core.JoinReader, post, spec.Input[0].ColumnTypes, streamerDiskMonitor, args.TypeResolver, ) if err != nil { @@ -981,8 +948,7 @@ func NewColOperator( return r, err } - result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(result.ColumnTypes, spec.Input[0].ColumnTypes) + result.ColumnTypes = spec.Input[0].ColumnTypes result.Root = inputs[0].Root if err := result.planAndMaybeWrapFilter( ctx, flowCtx, args, spec.ProcessorID, core.Filterer.Filter, factory, @@ -1006,18 +972,16 @@ func NewColOperator( if err != nil { return r, err } - inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(inputTypes, spec.Input[0].ColumnTypes) // Make a copy of the evalCtx since we're modifying it below. evalCtx := flowCtx.NewEvalCtx() newAggArgs := &colexecagg.NewAggregatorArgs{ Input: inputs[0].Root, - InputTypes: inputTypes, + InputTypes: spec.Input[0].ColumnTypes, Spec: aggSpec, EvalCtx: evalCtx, } newAggArgs.Constructors, newAggArgs.ConstArguments, newAggArgs.OutputTypes, err = colexecagg.ProcessAggregations( - ctx, evalCtx, args.ExprHelper.SemaCtx, aggSpec.Aggregations, inputTypes, + ctx, evalCtx, args.ExprHelper.SemaCtx, aggSpec.Aggregations, spec.Input[0].ColumnTypes, ) if err != nil { return r, err @@ -1061,7 +1025,7 @@ func NewColOperator( newHashAggArgs, sqArgs, hashAggregatorMemMonitorName := makeNewHashAggregatorArgs( ctx, flowCtx, args, opName, newAggArgs, factory, ) - sqArgs.Types = inputTypes + sqArgs.Types = spec.Input[0].ColumnTypes inMemoryHashAggregator := colexec.NewHashAggregator( ctx, newHashAggArgs, sqArgs, ) @@ -1124,8 +1088,7 @@ func NewColOperator( if err := checkNumIn(inputs, 1); err != nil { return r, err } - result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(result.ColumnTypes, spec.Input[0].ColumnTypes) + result.ColumnTypes = spec.Input[0].ColumnTypes if len(core.Distinct.OrderedColumns) == len(core.Distinct.DistinctColumns) { result.Root = colexecbase.NewOrderedDistinct( inputs[0].Root, core.Distinct.OrderedColumns, result.ColumnTypes, @@ -1189,7 +1152,8 @@ func NewColOperator( result.Root = colexecbase.NewOrdinalityOp( getStreamingAllocator(ctx, args), inputs[0].Root, outputIdx, ) - result.ColumnTypes = appendOneType(spec.Input[0].ColumnTypes, types.Int) + result.ColumnTypes = spec.Input[0].ColumnTypes + result.ColumnTypes = append(result.ColumnTypes, types.Int) case core.HashJoiner != nil: if err := checkNumIn(inputs, 2); err != nil { @@ -1203,10 +1167,6 @@ func NewColOperator( accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(ctx, flowCtx, opName, spec.ProcessorID, 2 /* numAccounts */) crossJoinerDiskAcc := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID) unlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory) - leftTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(leftTypes, spec.Input[0].ColumnTypes) - rightTypes := make([]*types.T, len(spec.Input[1].ColumnTypes)) - copy(rightTypes, spec.Input[1].ColumnTypes) result.Root = colexecjoin.NewCrossJoiner( unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), @@ -1214,7 +1174,7 @@ func NewColOperator( args.FDSemaphore, core.HashJoiner.Type, inputs[0].Root, inputs[1].Root, - leftTypes, rightTypes, + spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes, crossJoinerDiskAcc, accounts[1], ) @@ -1266,8 +1226,6 @@ func NewColOperator( } } - // MakeOutputTypes makes a copy, so we can just use the types - // directly from the input specs. result.ColumnTypes = core.HashJoiner.Type.MakeOutputTypes(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes) if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin { @@ -1283,11 +1241,6 @@ func NewColOperator( return r, err } - leftTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(leftTypes, spec.Input[0].ColumnTypes) - rightTypes := make([]*types.T, len(spec.Input[1].ColumnTypes)) - copy(rightTypes, spec.Input[1].ColumnTypes) - joinType := core.MergeJoiner.Type var onExpr *execinfrapb.Expression if !core.MergeJoiner.OnExpr.Empty() { @@ -1310,14 +1263,15 @@ func NewColOperator( mj := colexecjoin.NewMergeJoinOp( unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), args.DiskQueueCfg, args.FDSemaphore, - joinType, inputs[0].Root, inputs[1].Root, leftTypes, rightTypes, + joinType, inputs[0].Root, inputs[1].Root, + spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes, core.MergeJoiner.LeftOrdering.Columns, core.MergeJoiner.RightOrdering.Columns, diskAccount, accounts[1], flowCtx.EvalCtx, ) result.Root = mj result.ToClose = append(result.ToClose, mj.(colexecop.Closer)) - result.ColumnTypes = core.MergeJoiner.Type.MakeOutputTypes(leftTypes, rightTypes) + result.ColumnTypes = core.MergeJoiner.Type.MakeOutputTypes(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes) if onExpr != nil { if err = result.planAndMaybeWrapFilter( @@ -1338,8 +1292,6 @@ func NewColOperator( hjArgs, hashJoinerMemMonitorName := makeNewHashJoinerArgs( ctx, flowCtx, args, opName, hjSpec, factory, ) - // MakeOutputTypes makes a copy, so we can just use the types - // directly from the input specs. hjOutputTypes := hjSpec.Type.MakeOutputTypes(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes) joinOutputTypes := hjOutputTypes if len(hgjSpec.JoinOutputColumns) > 0 { @@ -1380,8 +1332,7 @@ func NewColOperator( ) // Spilling queue is needed for the left input to the hash // group-join. - sqArgs.Types = make([]*types.T, len(args.Spec.Input[0].ColumnTypes)) - copy(sqArgs.Types, args.Spec.Input[0].ColumnTypes) + sqArgs.Types = args.Spec.Input[0].ColumnTypes hgj := colexec.NewHashGroupJoiner( ctx, @@ -1471,8 +1422,7 @@ func NewColOperator( return r, err } input := inputs[0].Root - result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(result.ColumnTypes, spec.Input[0].ColumnTypes) + result.ColumnTypes = spec.Input[0].ColumnTypes ordering := core.Sorter.OutputOrdering matchLen := core.Sorter.OrderingMatchLen limit := core.Sorter.Limit @@ -1487,15 +1437,9 @@ func NewColOperator( } opNamePrefix := redact.RedactableString("window-") input := inputs[0].Root - result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes)) - copy(result.ColumnTypes, spec.Input[0].ColumnTypes) + result.ColumnTypes = spec.Input[0].ColumnTypes for _, wf := range core.Windower.WindowFns { - // We allocate the capacity for two extra types because of the temporary - // columns that can be appended below. Capacity is also allocated for - // each of the argument types in case casting is necessary. numInputCols := len(result.ColumnTypes) - typs := make([]*types.T, numInputCols, numInputCols+len(wf.ArgsIdxs)+2) - copy(typs, result.ColumnTypes) // Set any nil values in the window frame to their default values. wf.Frame = colexecwindow.NormalizeWindowFrame(wf.Frame) @@ -1505,7 +1449,7 @@ func NewColOperator( argTypes := make([]*types.T, len(wf.ArgsIdxs)) for i, idx := range wf.ArgsIdxs { argIdxs[i] = int(idx) - argTypes[i] = typs[idx] + argTypes[i] = result.ColumnTypes[idx] } // Perform any necessary casts for the argument columns. @@ -1514,7 +1458,7 @@ func NewColOperator( if typ == nil { continue } - castIdx := len(typs) + castIdx := len(result.ColumnTypes) input, err = colexecbase.GetCastOperator( getStreamingAllocator(ctx, args), input, argIdxs[i], castIdx, argTypes[i], typ, flowCtx.EvalCtx, @@ -1522,7 +1466,7 @@ func NewColOperator( if err != nil { colexecerror.InternalError(err) } - typs = append(typs, typ) + result.ColumnTypes = append(result.ColumnTypes, typ) argIdxs[i] = castIdx argTypes[i] = typ } @@ -1535,9 +1479,9 @@ func NewColOperator( // (probably by leveraging hash routers once we can // distribute). The decision about which kind of partitioner // to use should come from the optimizer. - partitionColIdx = len(typs) + partitionColIdx = len(result.ColumnTypes) input = colexecwindow.NewWindowSortingPartitioner( - getStreamingAllocator(ctx, args), input, typs, + getStreamingAllocator(ctx, args), input, result.ColumnTypes, core.Windower.PartitionBy, wf.Ordering.Columns, partitionColIdx, func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column) colexecop.Operator { return result.createDiskBackedSort( @@ -1548,28 +1492,28 @@ func NewColOperator( }, ) // Window partitioner will append a boolean column. - typs = append(typs, types.Bool) + result.ColumnTypes = append(result.ColumnTypes, types.Bool) } else { if len(wf.Ordering.Columns) > 0 { input = result.createDiskBackedSort( - ctx, flowCtx, args, input, typs, + ctx, flowCtx, args, input, result.ColumnTypes, wf.Ordering, 0 /* limit */, 0 /* matchLen */, 0, /* maxNumberPartitions */ spec.ProcessorID, opNamePrefix, factory, ) } } if colexecwindow.WindowFnNeedsPeersInfo(&wf) { - peersColIdx = len(typs) + peersColIdx = len(result.ColumnTypes) input = colexecwindow.NewWindowPeerGrouper( - getStreamingAllocator(ctx, args), input, typs, + getStreamingAllocator(ctx, args), input, result.ColumnTypes, wf.Ordering.Columns, partitionColIdx, peersColIdx, ) // Window peer grouper will append a boolean column. - typs = append(typs, types.Bool) + result.ColumnTypes = append(result.ColumnTypes, types.Bool) } // The output column is appended after any temporary columns. - outputColIdx := len(typs) + outputColIdx := len(result.ColumnTypes) windowArgs := &colexecwindow.WindowArgs{ EvalCtx: flowCtx.EvalCtx, @@ -1577,7 +1521,7 @@ func NewColOperator( QueueCfg: args.DiskQueueCfg, FdSemaphore: args.FDSemaphore, Input: input, - InputTypes: typs, + InputTypes: result.ColumnTypes, OutputColIdx: outputColIdx, PartitionColIdx: partitionColIdx, PeersColIdx: peersColIdx, @@ -1626,7 +1570,7 @@ func NewColOperator( ) result.Root, err = colexecwindow.NewLagOperator( windowArgs, argIdxs[0], argIdxs[1], argIdxs[2]) - returnType = typs[argIdxs[0]] + returnType = result.ColumnTypes[argIdxs[0]] case execinfrapb.WindowerSpec_LEAD: opName := opNamePrefix + "lead" result.finishBufferedWindowerArgs( @@ -1635,7 +1579,7 @@ func NewColOperator( ) result.Root, err = colexecwindow.NewLeadOperator( windowArgs, argIdxs[0], argIdxs[1], argIdxs[2]) - returnType = typs[argIdxs[0]] + returnType = result.ColumnTypes[argIdxs[0]] case execinfrapb.WindowerSpec_FIRST_VALUE: opName := opNamePrefix + "first_value" result.finishBufferedWindowerArgs( @@ -1644,7 +1588,7 @@ func NewColOperator( ) result.Root, err = colexecwindow.NewFirstValueOperator( windowArgs, wf.Frame, &wf.Ordering, argIdxs) - returnType = typs[argIdxs[0]] + returnType = result.ColumnTypes[argIdxs[0]] case execinfrapb.WindowerSpec_LAST_VALUE: opName := opNamePrefix + "last_value" result.finishBufferedWindowerArgs( @@ -1653,7 +1597,7 @@ func NewColOperator( ) result.Root, err = colexecwindow.NewLastValueOperator( windowArgs, wf.Frame, &wf.Ordering, argIdxs) - returnType = typs[argIdxs[0]] + returnType = result.ColumnTypes[argIdxs[0]] case execinfrapb.WindowerSpec_NTH_VALUE: opName := opNamePrefix + "nth_value" result.finishBufferedWindowerArgs( @@ -1662,7 +1606,7 @@ func NewColOperator( ) result.Root, err = colexecwindow.NewNthValueOperator( windowArgs, wf.Frame, &wf.Ordering, argIdxs) - returnType = typs[argIdxs[0]] + returnType = result.ColumnTypes[argIdxs[0]] default: return r, errors.AssertionFailedf("window function %s is not supported", wf.String()) } @@ -1735,9 +1679,17 @@ func NewColOperator( } projection[numInputCols] = uint32(outputColIdx) result.Root = colexecbase.NewSimpleProjectOp(result.Root, numOutputCols, projection) + // We need to allocate a fresh types slice because we'd + // "corrupt" the existing slice if we were to overwrite + // numInputCols'th position. + inputTypes := result.ColumnTypes[:numInputCols] + result.ColumnTypes = make([]*types.T, numInputCols+1) + copy(result.ColumnTypes, inputTypes) + result.ColumnTypes[numInputCols] = returnType + } else { + result.ColumnTypes = append(result.ColumnTypes, returnType) } - result.ColumnTypes = appendOneType(result.ColumnTypes, returnType) input = result.Root } @@ -1771,7 +1723,8 @@ func NewColOperator( err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err) } else { // The result can be updated with the post process result. - result.updateWithPostProcessResult(ppr) + r.Root = ppr.Op + r.ColumnTypes = ppr.ColumnTypes } if err != nil { return r, err @@ -1822,14 +1775,10 @@ func NewColOperator( // We will need to project out the original mismatched columns, so // we're keeping track of the required projection. projection := make([]uint32, len(args.Spec.ResultTypes)) - typesWithCasts := make([]*types.T, len(args.Spec.ResultTypes), len(args.Spec.ResultTypes)+numMismatchedTypes) - // All original mismatched columns will be passed through by all of the - // vectorized cast operators. - copy(typesWithCasts, r.ColumnTypes) for i := range args.Spec.ResultTypes { expected, actual := args.Spec.ResultTypes[i], r.ColumnTypes[i] if !actual.Identical(expected) { - castedIdx := len(typesWithCasts) + castedIdx := len(r.ColumnTypes) r.Root, err = colexecbase.GetCastOperator( getStreamingAllocator(ctx, args), r.Root, i, castedIdx, actual, expected, flowCtx.EvalCtx, @@ -1838,12 +1787,12 @@ func NewColOperator( return r, errors.NewAssertionErrorWithWrappedErrf(err, "unexpectedly couldn't plan a cast although IsCastSupported returned true") } projection[i] = uint32(castedIdx) - typesWithCasts = append(typesWithCasts, expected) + r.ColumnTypes = append(r.ColumnTypes, expected) } else { projection[i] = uint32(i) } } - r.Root, r.ColumnTypes = addProjection(r.Root, typesWithCasts, projection) + r.Root, r.ColumnTypes = addProjection(r.Root, r.ColumnTypes, projection) } takeOverMetaInfo(&result.OpWithMetaInfo, inputs) @@ -2035,12 +1984,6 @@ type postProcessResult struct { ColumnTypes []*types.T } -func (r opResult) updateWithPostProcessResult(ppr postProcessResult) { - r.Root = ppr.Op - r.ColumnTypes = make([]*types.T, len(ppr.ColumnTypes)) - copy(r.ColumnTypes, ppr.ColumnTypes) -} - func (r opResult) finishBufferedWindowerArgs( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -2326,7 +2269,7 @@ func planCastOperator( ) (op colexecop.Operator, resultIdx int, typs []*types.T, err error) { outputIdx := len(columnTypes) op, err = colexecbase.GetCastOperator(colmem.NewAllocator(ctx, acc, factory), input, inputIdx, outputIdx, fromType, toType, evalCtx) - typs = appendOneType(columnTypes, toType) + typs = append(columnTypes, toType) return op, outputIdx, typs, err } @@ -2349,7 +2292,7 @@ func planProjectionOperators( projectDatum := func(datum tree.Datum) (colexecop.Operator, error) { resultIdx = len(columnTypes) datumType := datum.ResolvedType() - typs = appendOneType(columnTypes, datumType) + typs = append(columnTypes, datumType) if datumType.Family() == types.UnknownFamily { // We handle Unknown type by planning a special constant null // operator. @@ -2402,7 +2345,7 @@ func planProjectionOperators( ) buffer := colexec.NewBufferOp(schemaEnforcer) caseOps := make([]colexecop.Operator, len(t.Whens)) - typs = appendOneType(columnTypes, caseOutputType) + typs = append(columnTypes, caseOutputType) thenIdxs := make([]int, len(t.Whens)+1) for i, when := range t.Whens { // The case operator is assembled from n WHEN arms, n THEN arms, and @@ -2572,7 +2515,7 @@ func planProjectionOperators( if r, ok := op.(execreleasable.Releasable); ok { *releasables = append(*releasables, r) } - typs = appendOneType(typs, t.ResolvedType()) + typs = append(typs, t.ResolvedType()) return op, resultIdx, typs, err case *tree.IfExpr: // We handle IfExpr by planning the equivalent CASE expression, namely @@ -2607,7 +2550,7 @@ func planProjectionOperators( if err != nil { return op, resultIdx, typs, err } - typs = appendOneType(typs, t.ResolvedType()) + typs = append(typs, t.ResolvedType()) return op, outputIdx, typs, nil case *tree.NullIfExpr: // We handle NullIfExpr by planning the equivalent CASE expression, @@ -2657,7 +2600,7 @@ func planProjectionOperators( colmem.NewAllocator(ctx, acc, factory), typs, tupleContentsIdxs, outputType, input, resultIdx, ) *releasables = append(*releasables, op.(execreleasable.Releasable)) - typs = appendOneType(typs, outputType) + typs = append(typs, outputType) return op, resultIdx, typs, err default: return nil, resultIdx, nil, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t)) @@ -2852,7 +2795,7 @@ func planProjectionExpr( if r, ok := op.(execreleasable.Releasable); ok { *releasables = append(*releasables, r) } - typs = appendOneType(typs, outputType) + typs = append(typs, outputType) return op, resultIdx, typs, err } @@ -2870,7 +2813,7 @@ func planLogicalProjectionOp( ) (op colexecop.Operator, resultIdx int, typs []*types.T, err error) { // Add a new boolean column that will store the result of the projection. resultIdx = len(columnTypes) - typs = appendOneType(columnTypes, types.Bool) + typs = append(columnTypes, types.Bool) var ( typedLeft, typedRight tree.TypedExpr leftProjOpChain, rightProjOpChain colexecop.Operator @@ -2948,21 +2891,10 @@ func planIsNullProjectionOp( op = colexec.NewIsNullProjOp( colmem.NewAllocator(ctx, acc, factory), op, resultIdx, outputIdx, negate, isTupleNull, ) - typs = appendOneType(typs, outputType) + typs = append(typs, outputType) return op, outputIdx, typs, nil } -// appendOneType appends a *types.T to then end of a []*types.T. The size of the -// underlying array of the resulting slice is 1 greater than the input slice. -// This differs from the built-in append function, which can double the capacity -// of the slice if its length is less than 1024, or increase by 25% otherwise. -func appendOneType(typs []*types.T, t *types.T) []*types.T { - newTyps := make([]*types.T, len(typs)+1) - copy(newTyps, typs) - newTyps[len(newTyps)-1] = t - return newTyps -} - // useDefaultCmpOpForIn returns whether IN and NOT IN projection/selection // operators should be handled via the default operators. This is the case when // we have an empty tuple or the tuple contains other tuples (these cases diff --git a/pkg/sql/colexec/colbuilder/execplan_test.go b/pkg/sql/colexec/colbuilder/execplan_test.go index 40861e8cf5a9..d979268f203f 100644 --- a/pkg/sql/colexec/colbuilder/execplan_test.go +++ b/pkg/sql/colexec/colbuilder/execplan_test.go @@ -202,3 +202,36 @@ func BenchmarkRenderPlanning(b *testing.B) { } } } + +// BenchmarkNestedAndPlanning benchmarks how long it takes to run a query with +// many expressions logically AND'ed in a single output column. +func BenchmarkNestedAndPlanning(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(b, base.TestServerArgs{SQLMemoryPoolSize: 10 << 30}) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + // Disable fallback to the row-by-row processing wrapping. + sqlDB.Exec(b, "SET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.min_render_count = 9999999") + sqlDB.Exec(b, "CREATE TABLE bench (i INT)") + for _, numRenders := range []int{1 << 4, 1 << 8, 1 << 12} { + var sb strings.Builder + sb.WriteString("SELECT ") + for i := 0; i < numRenders; i++ { + if i > 0 { + sb.WriteString(" AND ") + } + sb.WriteString(fmt.Sprintf("i < %d", i+1)) + } + sb.WriteString(" FROM bench") + query := sb.String() + b.Run(fmt.Sprintf("renders=%d", numRenders), func(b *testing.B) { + for i := 0; i < b.N; i++ { + sqlDB.Exec(b, query) + } + }) + } +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 2b2baed3b6dd..7964c6238661 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -2024,40 +2024,6 @@ func TestLint(t *testing.T) { } }) - t.Run("TestVectorizedTypeSchemaCopy", func(t *testing.T) { - t.Parallel() - cmd, stderr, filter, err := dirCmd( - pkgDir, - "git", - "grep", - "-nE", - // We prohibit appending to the type schema and require allocating - // a new slice. See the comment in execplan.go file. - `(yps|ypes) = append\(`, - "--", - "sql/colexec/execplan.go", - ) - if err != nil { - t.Fatal(err) - } - - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - if err := stream.ForEach(filter, func(s string) { - t.Errorf("\n%s <- forbidden; allocate a new []*types.T slice", s) - }); err != nil { - t.Error(err) - } - - if err := cmd.Wait(); err != nil { - if out := stderr.String(); len(out) > 0 { - t.Fatalf("err=%s, stderr=%s", err, out) - } - } - }) - t.Run("TestGCAssert", func(t *testing.T) { skip.UnderShort(t)