diff --git a/pkg/sql/colconv/vec_to_datum.eg.go b/pkg/sql/colconv/vec_to_datum.eg.go index dbe66ad49e65..e652a145f132 100644 --- a/pkg/sql/colconv/vec_to_datum.eg.go +++ b/pkg/sql/colconv/vec_to_datum.eg.go @@ -11,11 +11,13 @@ package colconv import ( "math/big" + "sync" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -46,14 +48,33 @@ type VecToDatumConverter struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &VecToDatumConverter{} + +var vecToDatumConverterPool = sync.Pool{ + New: func() interface{} { + return &VecToDatumConverter{} + }, +} + // NewVecToDatumConverter creates a new VecToDatumConverter. // - batchWidth determines the width of the batches that it will be converting. // - vecIdxsToConvert determines which vectors need to be converted. func NewVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *VecToDatumConverter { - return &VecToDatumConverter{ - convertedVecs: make([]tree.Datums, batchWidth), - vecIdxsToConvert: vecIdxsToConvert, + c := vecToDatumConverterPool.Get().(*VecToDatumConverter) + if cap(c.convertedVecs) < batchWidth { + c.convertedVecs = make([]tree.Datums, batchWidth) + } else { + c.convertedVecs = c.convertedVecs[:batchWidth] } + c.vecIdxsToConvert = vecIdxsToConvert + return c +} + +// Release is part of the execinfra.Releasable interface. +func (c *VecToDatumConverter) Release() { + c.convertedVecs = c.convertedVecs[:0] + c.vecIdxsToConvert = nil + vecToDatumConverterPool.Put(c) } // ConvertBatchAndDeselect converts the selected vectors from the batch while @@ -75,17 +96,17 @@ func (c *VecToDatumConverter) ConvertBatchAndDeselect(batch coldata.Batch) { return } // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < batchLength { c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] } + } + if c.da.AllocSize < batchLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = batchLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] - } } sel := batch.Selection() vecs := batch.ColVecs() @@ -129,17 +150,17 @@ func (c *VecToDatumConverter) ConvertVecs(vecs []coldata.Vec, inputLen int, sel // rely on the fact that selection vectors are increasing sequences. requiredLength = sel[inputLen-1] + 1 } - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < requiredLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < requiredLength { c.convertedVecs[vecIdx] = make([]tree.Datum, requiredLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } + } + if c.da.AllocSize < requiredLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = requiredLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] - } } for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( diff --git a/pkg/sql/colconv/vec_to_datum_tmpl.go b/pkg/sql/colconv/vec_to_datum_tmpl.go index fc743816abc6..e73a87d7f2dc 100644 --- a/pkg/sql/colconv/vec_to_datum_tmpl.go +++ b/pkg/sql/colconv/vec_to_datum_tmpl.go @@ -20,7 +20,10 @@ package colconv import ( + "sync" + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -49,14 +52,33 @@ type VecToDatumConverter struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &VecToDatumConverter{} + +var vecToDatumConverterPool = sync.Pool{ + New: func() interface{} { + return &VecToDatumConverter{} + }, +} + // NewVecToDatumConverter creates a new VecToDatumConverter. // - batchWidth determines the width of the batches that it will be converting. // - vecIdxsToConvert determines which vectors need to be converted. func NewVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *VecToDatumConverter { - return &VecToDatumConverter{ - convertedVecs: make([]tree.Datums, batchWidth), - vecIdxsToConvert: vecIdxsToConvert, + c := vecToDatumConverterPool.Get().(*VecToDatumConverter) + if cap(c.convertedVecs) < batchWidth { + c.convertedVecs = make([]tree.Datums, batchWidth) + } else { + c.convertedVecs = c.convertedVecs[:batchWidth] } + c.vecIdxsToConvert = vecIdxsToConvert + return c +} + +// Release is part of the execinfra.Releasable interface. +func (c *VecToDatumConverter) Release() { + c.convertedVecs = c.convertedVecs[:0] + c.vecIdxsToConvert = nil + vecToDatumConverterPool.Put(c) } // ConvertBatchAndDeselect converts the selected vectors from the batch while @@ -78,17 +100,17 @@ func (c *VecToDatumConverter) ConvertBatchAndDeselect(batch coldata.Batch) { return } // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < batchLength { c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] } + } + if c.da.AllocSize < batchLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = batchLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] - } } sel := batch.Selection() vecs := batch.ColVecs() @@ -132,17 +154,17 @@ func (c *VecToDatumConverter) ConvertVecs(vecs []coldata.Vec, inputLen int, sel // rely on the fact that selection vectors are increasing sequences. requiredLength = sel[inputLen-1] + 1 } - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < requiredLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < requiredLength { c.convertedVecs[vecIdx] = make([]tree.Datum, requiredLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } + } + if c.da.AllocSize < requiredLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = requiredLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] - } } for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( diff --git a/pkg/sql/colexec/case.go b/pkg/sql/colexec/case.go index 6c7589c7b83b..1e2f55777ef2 100644 --- a/pkg/sql/colexec/case.go +++ b/pkg/sql/colexec/case.go @@ -46,7 +46,7 @@ type caseOp struct { prevSel []int } -var _ InternalMemoryOperator = &caseOp{} +var _ colexecbase.Operator = &caseOp{} func (c *caseOp) ChildCount(verbose bool) int { return 1 + len(c.caseOps) + 1 @@ -65,11 +65,6 @@ func (c *caseOp) Child(nth int, verbose bool) execinfra.OpNode { return nil } -func (c *caseOp) InternalMemoryUsage() int { - // We internally use two selection vectors, origSel and prevSel. - return 2 * colmem.SizeOfBatchSizeSelVector -} - // NewCaseOp returns an operator that runs a case statement. // buffer is a bufferOp that will return the input batch repeatedly. // caseOps is a list of operator chains, one per branch in the case statement. @@ -89,6 +84,8 @@ func NewCaseOp( outputIdx int, typ *types.T, ) colexecbase.Operator { + // We internally use two selection vectors, origSel and prevSel. + allocator.AdjustMemoryUsage(int64(2 * colmem.SizeOfBatchSizeSelVector)) return &caseOp{ allocator: allocator, buffer: buffer.(*bufferOp), diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index ce8eb765be28..a44649a4a1d0 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -631,7 +631,11 @@ func NewColOperator( }() spec := args.Spec inputs := args.Inputs - factory := coldataext.NewExtendedColumnFactory(flowCtx.NewEvalCtx()) + evalCtx := flowCtx.NewEvalCtx() + factory := args.Factory + if factory == nil { + factory = coldataext.NewExtendedColumnFactory(evalCtx) + } streamingMemAccount := args.StreamingMemAccount streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory) useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering @@ -645,6 +649,7 @@ func NewColOperator( core := &spec.Core post := &spec.Post + var procOutputHelper *execinfra.ProcOutputHelper // resultPreSpecPlanningStateShallowCopy is a shallow copy of the result // before any specs are planned. Used if there is a need to backtrack. @@ -697,14 +702,16 @@ func NewColOperator( if err := checkNumIn(inputs, 0); err != nil { return r, err } - scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, core.TableReader, post) + scanOp, helper, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post) if err != nil { return r, err } result.Op = scanOp result.IOReader = scanOp result.MetadataSources = append(result.MetadataSources, scanOp) + result.Releasables = append(result.Releasables, helper) result.Releasables = append(result.Releasables, scanOp) + procOutputHelper = helper // colBatchScan is wrapped with a cancel checker below, so we need to // log its creation separately. if log.V(1) { @@ -752,10 +759,9 @@ func NewColOperator( } inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) copy(inputTypes, spec.Input[0].ColumnTypes) - evalCtx := flowCtx.NewEvalCtx() var constructors []execinfrapb.AggregateConstructor var constArguments []tree.Datums - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) constructors, constArguments, result.ColumnTypes, err = colexecagg.ProcessAggregations( evalCtx, semaCtx, aggSpec.Aggregations, inputTypes, ) @@ -933,7 +939,7 @@ func NewColOperator( if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin { if err = result.planAndMaybeWrapOnExprAsFilter( - ctx, flowCtx, args, core.HashJoiner.OnExpr, factory, + ctx, flowCtx, evalCtx, args, core.HashJoiner.OnExpr, factory, ); err != nil { return r, err } @@ -993,7 +999,7 @@ func NewColOperator( if onExpr != nil { if err = result.planAndMaybeWrapOnExprAsFilter( - ctx, flowCtx, args, *onExpr, factory, + ctx, flowCtx, evalCtx, args, *onExpr, factory, ); err != nil { return r, err } @@ -1135,10 +1141,6 @@ func NewColOperator( return r, err } - // After constructing the base operator, calculate its internal memory usage. - if sMem, ok := result.Op.(colexec.InternalMemoryOperator); ok { - result.InternalMemUsage += sMem.InternalMemoryUsage() - } if log.V(1) { log.Infof(ctx, "made op %T\n", result.Op) } @@ -1150,7 +1152,7 @@ func NewColOperator( Op: result.Op, ColumnTypes: result.ColumnTypes, } - err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory) + err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory, procOutputHelper) if err != nil { if log.V(2) { log.Infof( @@ -1233,21 +1235,17 @@ func NewColOperator( func (r opResult) planAndMaybeWrapOnExprAsFilter( ctx context.Context, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, args *colexec.NewColOperatorArgs, onExpr execinfrapb.Expression, factory coldata.ColumnFactory, ) error { - // We will plan other Operators on top of r.Op, so we need to account for the - // internal memory explicitly. - if internalMemOp, ok := r.Op.(colexec.InternalMemoryOperator); ok { - r.InternalMemUsage += internalMemOp.InternalMemoryUsage() - } ppr := postProcessResult{ Op: r.Op, ColumnTypes: r.ColumnTypes, } if err := ppr.planFilterExpr( - ctx, flowCtx, flowCtx.NewEvalCtx(), onExpr, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, nil, /* procOutputHelper */ ); err != nil { // ON expression planning failed. Fall back to planning the filter // using row execution. @@ -1294,17 +1292,21 @@ func (r opResult) wrapPostProcessSpec( } // planPostProcessSpec plans the post processing stage specified in post on top -// of r.Op. +// of r.Op. It takes in an optional procOutputHelper which has already been +// initialized with post meaning that it already contains well-typed expressions +// which allows us to avoid redundant deserialization. func (r *postProcessResult) planPostProcessSpec( ctx context.Context, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, args *colexec.NewColOperatorArgs, post *execinfrapb.PostProcessSpec, factory coldata.ColumnFactory, + procOutputHelper *execinfra.ProcOutputHelper, ) error { if !post.Filter.Empty() { if err := r.planFilterExpr( - ctx, flowCtx, flowCtx.NewEvalCtx(), post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, procOutputHelper, ); err != nil { return err } @@ -1316,17 +1318,22 @@ func (r *postProcessResult) planPostProcessSpec( if log.V(2) { log.Infof(ctx, "planning render expressions %+v", post.RenderExprs) } - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) var renderedCols []uint32 - for _, renderExpr := range post.RenderExprs { - var renderInternalMem int - expr, err := args.ExprHelper.ProcessExpr(renderExpr, semaCtx, flowCtx.EvalCtx, r.ColumnTypes) - if err != nil { - return err + for renderIdx, renderExpr := range post.RenderExprs { + var expr tree.TypedExpr + var err error + if procOutputHelper != nil { + expr = procOutputHelper.RenderExprs[renderIdx].Expr + } else { + expr, err = args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes) + if err != nil { + return err + } } var outputIdx int - r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators( - ctx, flowCtx.NewEvalCtx(), expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, + r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators( + ctx, evalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize render expression %q", expr) @@ -1334,7 +1341,6 @@ func (r *postProcessResult) planPostProcessSpec( if outputIdx < 0 { return errors.AssertionFailedf("missing outputIdx") } - r.InternalMemUsage += renderInternalMem renderedCols = append(renderedCols, uint32(outputIdx)) } r.Op = colexec.NewSimpleProjectOp(r.Op, len(r.ColumnTypes), renderedCols) @@ -1437,18 +1443,20 @@ func (r opResult) createDiskAccount( } type postProcessResult struct { - Op colexecbase.Operator - ColumnTypes []*types.T - InternalMemUsage int + Op colexecbase.Operator + ColumnTypes []*types.T } func (r opResult) updateWithPostProcessResult(ppr postProcessResult) { r.Op = ppr.Op r.ColumnTypes = make([]*types.T, len(ppr.ColumnTypes)) copy(r.ColumnTypes, ppr.ColumnTypes) - r.InternalMemUsage += ppr.InternalMemUsage } +// planFilterExpr creates all operators to implement filter expression. It takes +// in an optional procOutputHelper which has already been initialized with post +// meaning that it already contains well-typed expressions which allows us to +// avoid redundant deserialization. func (r *postProcessResult) planFilterExpr( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1457,12 +1465,20 @@ func (r *postProcessResult) planFilterExpr( acc *mon.BoundAccount, factory coldata.ColumnFactory, helper *colexec.ExprHelper, + procOutputHelper *execinfra.ProcOutputHelper, ) error { - var selectionInternalMem int - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) - expr, err := helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes) - if err != nil { - return err + var ( + expr tree.TypedExpr + err error + ) + if procOutputHelper != nil { + expr = procOutputHelper.Filter.Expr + } else { + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) + expr, err = helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes) + if err != nil { + return err + } } if expr == tree.DNull { // The filter expression is tree.DNull meaning that it is always false, so @@ -1471,13 +1487,12 @@ func (r *postProcessResult) planFilterExpr( return nil } var filterColumnTypes []*types.T - r.Op, _, filterColumnTypes, selectionInternalMem, err = planSelectionOperators( + r.Op, _, filterColumnTypes, err = planSelectionOperators( ctx, evalCtx, expr, r.ColumnTypes, r.Op, acc, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize filter expression %q", filter) } - r.InternalMemUsage += selectionInternalMem if len(filterColumnTypes) > len(r.ColumnTypes) { // Additional columns were appended to store projections while evaluating // the filter. Project them away. @@ -1510,28 +1525,27 @@ func planSelectionOperators( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { switch t := expr.(type) { case *tree.IndexedVar: op, err = colexec.BoolOrUnknownToSelOp(input, columnTypes, t.Idx) - return op, -1, columnTypes, internalMemUsed, err + return op, -1, columnTypes, err case *tree.AndExpr: // AND expressions are handled by an implicit AND'ing of selection vectors. // First we select out the tuples that are true on the left side, and then, // only among the matched tuples, we select out the tuples that are true on // the right side. var leftOp, rightOp colexecbase.Operator - var internalMemUsedLeft, internalMemUsedRight int - leftOp, _, typs, internalMemUsedLeft, err = planSelectionOperators( + leftOp, _, typs, err = planSelectionOperators( ctx, evalCtx, t.TypedLeft(), columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - rightOp, resultIdx, typs, internalMemUsedRight, err = planSelectionOperators( + rightOp, resultIdx, typs, err = planSelectionOperators( ctx, evalCtx, t.TypedRight(), typs, leftOp, acc, factory, ) - return rightOp, resultIdx, typs, internalMemUsedLeft + internalMemUsedRight, err + return rightOp, resultIdx, typs, err case *tree.OrExpr: // OR expressions are handled by converting them to an equivalent CASE // statement. Since CASE statements don't have a selection form, plan a @@ -1550,54 +1564,54 @@ func planSelectionOperators( tree.DBoolFalse, types.Bool) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, caseExpr, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } op, err = colexec.BoolOrUnknownToSelOp(op, typs, resultIdx) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.CaseExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op, err = colexec.BoolOrUnknownToSelOp(op, typs, resultIdx) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.IsNullExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, t.TypedInnerExpr(), columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op = colexec.NewIsNullSelOp( op, resultIdx, false /* negate */, typs[resultIdx].Family() == types.TupleFamily, ) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.IsNotNullExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, t.TypedInnerExpr(), columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op = colexec.NewIsNullSelOp( op, resultIdx, true /* negate */, typs[resultIdx].Family() == types.TupleFamily, ) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.ComparisonExpr: cmpOp := t.Operator - leftOp, leftIdx, ct, internalMemUsedLeft, err := planProjectionOperators( + leftOp, leftIdx, ct, err := planProjectionOperators( ctx, evalCtx, t.TypedLeft(), columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, ct, internalMemUsed, err + return nil, resultIdx, ct, err } lTyp := ct[leftIdx] if constArg, ok := t.Right.(tree.Datum); ok { @@ -1639,20 +1653,20 @@ func planSelectionOperators( cmpOp, leftOp, ct, leftIdx, constArg, evalCtx, t, ) } - return op, resultIdx, ct, internalMemUsedLeft, err + return op, resultIdx, ct, err } - rightOp, rightIdx, ct, internalMemUsedRight, err := planProjectionOperators( + rightOp, rightIdx, ct, err := planProjectionOperators( ctx, evalCtx, t.TypedRight(), ct, leftOp, acc, factory, ) if err != nil { - return nil, resultIdx, ct, internalMemUsed, err + return nil, resultIdx, ct, err } op, err = colexec.GetSelectionOperator( cmpOp, rightOp, ct, leftIdx, rightIdx, evalCtx, t, ) - return op, resultIdx, ct, internalMemUsedLeft + internalMemUsedRight, err + return op, resultIdx, ct, err default: - return nil, resultIdx, nil, internalMemUsed, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t)) + return nil, resultIdx, nil, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t)) } } @@ -1687,7 +1701,7 @@ func planProjectionOperators( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { // projectDatum is a helper function that adds a new constant projection // operator for the given datum. typs are updated accordingly. projectDatum := func(datum tree.Datum) (colexecbase.Operator, error) { @@ -1705,7 +1719,7 @@ func planProjectionOperators( resultIdx = -1 switch t := expr.(type) { case *tree.IndexedVar: - return input, t.Idx, columnTypes, internalMemUsed, nil + return input, t.Idx, columnTypes, nil case *tree.ComparisonExpr: return planProjectionExpr( ctx, evalCtx, t.Operator, t.ResolvedType(), t.TypedLeft(), t.TypedRight(), @@ -1713,7 +1727,7 @@ func planProjectionOperators( ) case *tree.BinaryExpr: if err = checkSupportedBinaryExpr(t.TypedLeft(), t.TypedRight(), t.ResolvedType()); err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } return planProjectionExpr( ctx, evalCtx, t.Operator, t.ResolvedType(), t.TypedLeft(), t.TypedRight(), @@ -1725,19 +1739,16 @@ func planProjectionOperators( return planIsNullProjectionOp(ctx, evalCtx, t.ResolvedType(), t.TypedInnerExpr(), columnTypes, input, acc, true /* negate */, factory) case *tree.CastExpr: expr := t.Expr.(tree.TypedExpr) - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return nil, 0, nil, internalMemUsed, err + return nil, 0, nil, err } op, resultIdx, typs, err = planCastOperator(ctx, acc, typs, op, resultIdx, expr.ResolvedType(), t.ResolvedType(), factory) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.FuncExpr: - var ( - inputCols []int - projectionInternalMem int - ) + var inputCols []int typs = make([]*types.T, len(columnTypes)) copy(typs, columnTypes) op = input @@ -1746,24 +1757,23 @@ func planProjectionOperators( // TODO(rohany): This could be done better, especially in the case of // constant arguments, because the vectorized engine right now // creates a new column full of the constant value. - op, resultIdx, typs, projectionInternalMem, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, e.(tree.TypedExpr), typs, op, acc, factory, ) if err != nil { - return nil, resultIdx, nil, internalMemUsed, err + return nil, resultIdx, nil, err } inputCols = append(inputCols, resultIdx) - internalMemUsed += projectionInternalMem } resultIdx = len(typs) op, err = colexec.NewBuiltinFunctionOperator( colmem.NewAllocator(ctx, acc, factory), evalCtx, t, typs, inputCols, resultIdx, op, ) typs = appendOneType(typs, t.ResolvedType()) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case tree.Datum: op, err = projectDatum(t) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.Tuple: isConstTuple := true for _, expr := range t.Exprs { @@ -1777,23 +1787,21 @@ func planProjectionOperators( // project the resulting datum. tuple, err := t.Eval(evalCtx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } op, err = projectDatum(tuple) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } outputType := t.ResolvedType() typs = make([]*types.T, len(columnTypes)) copy(typs, columnTypes) tupleContentsIdxs := make([]int, len(t.Exprs)) for i, expr := range t.Exprs { - var memUsed int - input, tupleContentsIdxs[i], typs, memUsed, err = planProjectionOperators( + input, tupleContentsIdxs[i], typs, err = planProjectionOperators( ctx, evalCtx, expr.(tree.TypedExpr), typs, input, acc, factory, ) - internalMemUsed += memUsed if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } resultIdx = len(typs) @@ -1801,10 +1809,10 @@ func planProjectionOperators( colmem.NewAllocator(ctx, acc, factory), typs, tupleContentsIdxs, outputType, input, resultIdx, ) typs = appendOneType(typs, outputType) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.CaseExpr: if t.Expr != nil { - return nil, resultIdx, typs, internalMemUsed, errors.New("CASE WHEN expressions unsupported") + return nil, resultIdx, typs, errors.New("CASE WHEN expressions unsupported") } allocator := colmem.NewAllocator(ctx, acc, factory) @@ -1814,7 +1822,7 @@ func planProjectionOperators( // works (which populates its output in arbitrary order) and the flat // bytes implementation of Bytes type (which prohibits sets in arbitrary // order), so we reject such scenario to fall back to row-by-row engine. - return nil, resultIdx, typs, internalMemUsed, errors.Newf( + return nil, resultIdx, typs, errors.Newf( "unsupported type %s in CASE operator", caseOutputType) } caseOutputIdx := len(columnTypes) @@ -1842,26 +1850,24 @@ func planProjectionOperators( // results of the WHEN into a single output vector, assembling the final // result of the case projection. whenTyped := when.Cond.(tree.TypedExpr) - var whenInternalMemUsed, thenInternalMemUsed int - caseOps[i], resultIdx, typs, whenInternalMemUsed, err = planProjectionOperators( + caseOps[i], resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, whenTyped, typs, buffer, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } caseOps[i], err = colexec.BoolOrUnknownToSelOp(caseOps[i], typs, resultIdx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } // Run the "then" clause on those tuples that were selected. - caseOps[i], thenIdxs[i], typs, thenInternalMemUsed, err = planProjectionOperators( + caseOps[i], thenIdxs[i], typs, err = planProjectionOperators( ctx, evalCtx, when.Val.(tree.TypedExpr), typs, caseOps[i], acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += whenInternalMemUsed + thenInternalMemUsed if !typs[thenIdxs[i]].Identical(typs[caseOutputIdx]) { // It is possible that the projection of this THEN arm has different // column type (for example, we expect INT2, but INT8 is given). In @@ -1871,24 +1877,22 @@ func planProjectionOperators( ctx, acc, typs, caseOps[i], thenIdxs[i], fromType, toType, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } } - var elseInternalMemUsed int var elseOp colexecbase.Operator elseExpr := t.Else if elseExpr == nil { // If there's no ELSE arm, we write NULLs. elseExpr = tree.DNull } - elseOp, thenIdxs[len(t.Whens)], typs, elseInternalMemUsed, err = planProjectionOperators( + elseOp, thenIdxs[len(t.Whens)], typs, err = planProjectionOperators( ctx, evalCtx, elseExpr.(tree.TypedExpr), typs, buffer, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += elseInternalMemUsed if !typs[thenIdxs[len(t.Whens)]].Identical(typs[caseOutputIdx]) { // It is possible that the projection of the ELSE arm has different // column type (for example, we expect INT2, but INT8 is given). In @@ -1899,18 +1903,17 @@ func planProjectionOperators( ctx, acc, typs, elseOp, elseIdx, fromType, toType, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } schemaEnforcer.SetTypes(typs) op := colexec.NewCaseOp(allocator, buffer, caseOps, elseOp, thenIdxs, caseOutputIdx, caseOutputType) - internalMemUsed += op.(colexec.InternalMemoryOperator).InternalMemoryUsage() - return op, caseOutputIdx, typs, internalMemUsed, err + return op, caseOutputIdx, typs, err case *tree.AndExpr, *tree.OrExpr: return planLogicalProjectionOp(ctx, evalCtx, expr, columnTypes, input, acc, factory) default: - return nil, resultIdx, nil, internalMemUsed, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t)) + return nil, resultIdx, nil, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t)) } } @@ -1954,9 +1957,9 @@ func planProjectionExpr( factory coldata.ColumnFactory, binFn tree.TwoArgFn, cmpExpr *tree.ComparisonExpr, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { if err := checkSupportedProjectionExpr(left, right); err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } allocator := colmem.NewAllocator(ctx, acc, factory) resultIdx = -1 @@ -1968,11 +1971,11 @@ func planProjectionExpr( // argument is on the right side. This doesn't happen for non-commutative // operators such as - and /, though, so we still need this case. var rightIdx int - input, rightIdx, typs, internalMemUsed, err = planProjectionOperators( + input, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, right, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } resultIdx = len(typs) // The projection result will be outputted to a new column which is appended @@ -1982,17 +1985,13 @@ func planProjectionExpr( rightIdx, lConstArg, resultIdx, evalCtx, binFn, cmpExpr, ) } else { - var ( - leftIdx int - internalMemUsedLeft int - ) - input, leftIdx, typs, internalMemUsedLeft, err = planProjectionOperators( + var leftIdx int + input, leftIdx, typs, err = planProjectionOperators( ctx, evalCtx, left, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += internalMemUsedLeft // Note that this check exists only for testing purposes. On the // regular workloads, we expect that tree.Tuples have already been // pre-evaluated. We also don't need fully-fledged planning as we have @@ -2001,7 +2000,7 @@ func planProjectionExpr( if tuple, ok := right.(*tree.Tuple); ok { tupleDatum, err := tuple.Eval(evalCtx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } right = tupleDatum } @@ -2054,17 +2053,13 @@ func planProjectionExpr( } } else { // Case 3: neither are constant. - var ( - rightIdx int - internalMemUsedRight int - ) - input, rightIdx, typs, internalMemUsedRight, err = planProjectionOperators( + var rightIdx int + input, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, right, typs, input, acc, factory, ) if err != nil { - return nil, resultIdx, nil, internalMemUsed, err + return nil, resultIdx, nil, err } - internalMemUsed += internalMemUsedRight resultIdx = len(typs) op, err = colexec.GetProjectionOperator( allocator, typs, outputType, projOp, input, leftIdx, rightIdx, @@ -2073,13 +2068,10 @@ func planProjectionExpr( } } if err != nil { - return op, resultIdx, typs, internalMemUsed, err - } - if sMem, ok := op.(colexec.InternalMemoryOperator); ok { - internalMemUsed += sMem.InternalMemoryUsage() + return op, resultIdx, typs, err } typs = appendOneType(typs, outputType) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } // planLogicalProjectionOp plans all the needed operators for a projection of @@ -2092,15 +2084,14 @@ func planLogicalProjectionOp( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { // Add a new boolean column that will store the result of the projection. resultIdx = len(columnTypes) typs = appendOneType(columnTypes, types.Bool) var ( - typedLeft, typedRight tree.TypedExpr - leftProjOpChain, rightProjOpChain colexecbase.Operator - leftIdx, rightIdx int - internalMemUsedLeft, internalMemUsedRight int + typedLeft, typedRight tree.TypedExpr + leftProjOpChain, rightProjOpChain colexecbase.Operator + leftIdx, rightIdx int ) leftFeedOp := colexec.NewFeedOperator() rightFeedOp := colexec.NewFeedOperator() @@ -2114,17 +2105,17 @@ func planLogicalProjectionOp( default: colexecerror.InternalError(errors.AssertionFailedf("unexpected logical expression type %s", t.String())) } - leftProjOpChain, leftIdx, typs, internalMemUsedLeft, err = planProjectionOperators( + leftProjOpChain, leftIdx, typs, err = planProjectionOperators( ctx, evalCtx, typedLeft, typs, leftFeedOp, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - rightProjOpChain, rightIdx, typs, internalMemUsedRight, err = planProjectionOperators( + rightProjOpChain, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, typedRight, typs, rightFeedOp, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } allocator := colmem.NewAllocator(ctx, acc, factory) input = colexec.NewBatchSchemaSubsetEnforcer(allocator, input, typs, resultIdx, len(typs)) @@ -2146,7 +2137,7 @@ func planLogicalProjectionOp( leftIdx, rightIdx, resultIdx, ) } - return op, resultIdx, typs, internalMemUsedLeft + internalMemUsedRight, err + return op, resultIdx, typs, err } // planIsNullProjectionOp plans the operator for IS NULL and IS NOT NULL @@ -2161,12 +2152,12 @@ func planIsNullProjectionOp( acc *mon.BoundAccount, negate bool, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } outputIdx := len(typs) isTupleNull := typs[resultIdx].Family() == types.TupleFamily @@ -2174,7 +2165,7 @@ func planIsNullProjectionOp( colmem.NewAllocator(ctx, acc, factory), op, resultIdx, outputIdx, negate, isTupleNull, ) typs = appendOneType(typs, outputType) - return op, outputIdx, typs, internalMemUsed, nil + return op, outputIdx, typs, nil } // appendOneType appends a *types.T to then end of a []*types.T. The size of the diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 9d2d070bd255..04b386085341 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -141,6 +141,13 @@ var materializerPool = sync.Pool{ }, } +// materializerEmptyPostProcessSpec is the spec used to initialize the +// materializer. Currently, we assume that the input to the materializer fully +// handles any post-processing, so we always use the empty spec. +// Note that this variable is never modified once initialized (neither is the +// post-processing spec object itself), so it is thread-safe. +var materializerEmptyPostProcessSpec = &execinfrapb.PostProcessSpec{} + // NewMaterializer creates a new Materializer processor which processes the // columnar data coming from input to return it as rows. // Arguments: @@ -180,13 +187,16 @@ func NewMaterializer( closers: toClose, } - if err := m.ProcessorBase.Init( + if err := m.ProcessorBase.InitWithEvalCtx( m, // input must have handled any post-processing itself, so we pass in // an empty post-processing spec. - &execinfrapb.PostProcessSpec{}, + materializerEmptyPostProcessSpec, typs, flowCtx, + // Materializer doesn't modify the eval context, so it is safe to reuse + // the one from the flow context. + flowCtx.EvalCtx, processorID, output, nil, /* memMonitor */ @@ -309,6 +319,7 @@ func (m *Materializer) ConsumerClosed() { func (m *Materializer) Release() { m.drainHelper.Release() m.ProcessorBase.Reset() + m.converter.Release() *m = Materializer{ // We're keeping the reference to the same ProcessorBase since it // allows us to reuse some of the slices as well as ProcOutputHelper diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index bf135d492581..adf96210f6a5 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -530,11 +530,6 @@ func (o *mergeJoinBase) reset(ctx context.Context) { o.resetBuilderCrossProductState() } -func (o *mergeJoinBase) InternalMemoryUsage() int { - const sizeOfGroup = int(unsafe.Sizeof(group{})) - return 8 * coldata.BatchSize() * sizeOfGroup // o.groups -} - func (o *mergeJoinBase) Init() { if o.joinType.ShouldIncludeLeftColsInOutput() { o.outputTypes = append(o.outputTypes, o.left.sourceTypes...) @@ -562,6 +557,8 @@ func (o *mergeJoinBase) Init() { o.builderState.lGroups = make([]group, 1) o.builderState.rGroups = make([]group, 1) + const sizeOfGroup = int(unsafe.Sizeof(group{})) + o.unlimitedAllocator.AdjustMemoryUsage(int64(8 * coldata.BatchSize() * sizeOfGroup)) o.groups = makeGroupsBuffer(coldata.BatchSize()) o.resetBuilderCrossProductState() } diff --git a/pkg/sql/colexec/mergejoiner_exceptall.eg.go b/pkg/sql/colexec/mergejoiner_exceptall.eg.go index 6c370ce8fe09..cd01c9d6570a 100644 --- a/pkg/sql/colexec/mergejoiner_exceptall.eg.go +++ b/pkg/sql/colexec/mergejoiner_exceptall.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinExceptAllOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinExceptAllOp{} +var _ colexecbase.Operator = &mergeJoinExceptAllOp{} func (o *mergeJoinExceptAllOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_fullouter.eg.go b/pkg/sql/colexec/mergejoiner_fullouter.eg.go index a49a2f0a7829..e2b33bf29355 100644 --- a/pkg/sql/colexec/mergejoiner_fullouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_fullouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinFullOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinFullOuterOp{} +var _ colexecbase.Operator = &mergeJoinFullOuterOp{} func (o *mergeJoinFullOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_inner.eg.go b/pkg/sql/colexec/mergejoiner_inner.eg.go index 27ec6de29547..c53a6131152a 100644 --- a/pkg/sql/colexec/mergejoiner_inner.eg.go +++ b/pkg/sql/colexec/mergejoiner_inner.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinInnerOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinInnerOp{} +var _ colexecbase.Operator = &mergeJoinInnerOp{} func (o *mergeJoinInnerOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_intersectall.eg.go b/pkg/sql/colexec/mergejoiner_intersectall.eg.go index 728fc8f09cfb..395553a4c46d 100644 --- a/pkg/sql/colexec/mergejoiner_intersectall.eg.go +++ b/pkg/sql/colexec/mergejoiner_intersectall.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinIntersectAllOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinIntersectAllOp{} +var _ colexecbase.Operator = &mergeJoinIntersectAllOp{} func (o *mergeJoinIntersectAllOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftanti.eg.go b/pkg/sql/colexec/mergejoiner_leftanti.eg.go index 148460a7c20d..8e7159c60e03 100644 --- a/pkg/sql/colexec/mergejoiner_leftanti.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftanti.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftAntiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftAntiOp{} +var _ colexecbase.Operator = &mergeJoinLeftAntiOp{} func (o *mergeJoinLeftAntiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftouter.eg.go b/pkg/sql/colexec/mergejoiner_leftouter.eg.go index f44f899404c4..d955a510b0a4 100644 --- a/pkg/sql/colexec/mergejoiner_leftouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftOuterOp{} +var _ colexecbase.Operator = &mergeJoinLeftOuterOp{} func (o *mergeJoinLeftOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftsemi.eg.go b/pkg/sql/colexec/mergejoiner_leftsemi.eg.go index aef8352cb484..21538f1cb89d 100644 --- a/pkg/sql/colexec/mergejoiner_leftsemi.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftsemi.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftSemiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftSemiOp{} +var _ colexecbase.Operator = &mergeJoinLeftSemiOp{} func (o *mergeJoinLeftSemiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightanti.eg.go b/pkg/sql/colexec/mergejoiner_rightanti.eg.go index 669fd37ac4ae..c7308fec5015 100644 --- a/pkg/sql/colexec/mergejoiner_rightanti.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightanti.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightAntiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightAntiOp{} +var _ colexecbase.Operator = &mergeJoinRightAntiOp{} func (o *mergeJoinRightAntiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightouter.eg.go b/pkg/sql/colexec/mergejoiner_rightouter.eg.go index 2216b620803d..da0cdcdedb5d 100644 --- a/pkg/sql/colexec/mergejoiner_rightouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightOuterOp{} +var _ colexecbase.Operator = &mergeJoinRightOuterOp{} func (o *mergeJoinRightOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightsemi.eg.go b/pkg/sql/colexec/mergejoiner_rightsemi.eg.go index c9e2cddef4f8..f1f7cde68389 100644 --- a/pkg/sql/colexec/mergejoiner_rightsemi.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightsemi.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightSemiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightSemiOp{} +var _ colexecbase.Operator = &mergeJoinRightSemiOp{} func (o *mergeJoinRightSemiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_tmpl.go b/pkg/sql/colexec/mergejoiner_tmpl.go index 503e3519d641..76359d7a51b0 100644 --- a/pkg/sql/colexec/mergejoiner_tmpl.go +++ b/pkg/sql/colexec/mergejoiner_tmpl.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execgen" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -97,7 +98,7 @@ type mergeJoin_JOIN_TYPE_STRINGOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoin_JOIN_TYPE_STRINGOp{} +var _ colexecbase.Operator = &mergeJoin_JOIN_TYPE_STRINGOp{} // {{/* // This code snippet is the "meat" of the probing phase. diff --git a/pkg/sql/colexec/op_creation.go b/pkg/sql/colexec/op_creation.go index 5ad2503d7699..2510318f81a7 100644 --- a/pkg/sql/colexec/op_creation.go +++ b/pkg/sql/colexec/op_creation.go @@ -14,6 +14,7 @@ import ( "context" "sync" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -39,6 +40,7 @@ type NewColOperatorArgs struct { DiskQueueCfg colcontainer.DiskQueueCfg FDSemaphore semaphore.Semaphore ExprHelper *ExprHelper + Factory coldata.ColumnFactory TestingKnobs struct { // UseStreamingMemAccountForBuffering specifies whether to use // StreamingMemAccount when creating buffering operators and should only be @@ -75,11 +77,10 @@ type NewColOperatorArgs struct { // NewColOperatorResult is a helper struct that encompasses all of the return // values of NewColOperator call. type NewColOperatorResult struct { - Op colexecbase.Operator - IOReader execinfra.IOReader - ColumnTypes []*types.T - InternalMemUsage int - MetadataSources []execinfrapb.MetadataSource + Op colexecbase.Operator + IOReader execinfra.IOReader + ColumnTypes []*types.T + MetadataSources []execinfrapb.MetadataSource // ToClose is a slice of components that need to be Closed. ToClose []colexecbase.Closer OpMonitors []*mon.BytesMonitor diff --git a/pkg/sql/colexec/operator.go b/pkg/sql/colexec/operator.go index 5a4c477311c9..701b2dab5c07 100644 --- a/pkg/sql/colexec/operator.go +++ b/pkg/sql/colexec/operator.go @@ -97,20 +97,6 @@ func (n *twoInputNode) Child(nth int, verbose bool) execinfra.OpNode { return nil } -// TODO(yuzefovich): audit all Operators to make sure that all internal memory -// is accounted for. - -// InternalMemoryOperator is an interface that operators which use internal -// memory need to implement. "Internal memory" is defined as memory that is -// "private" to the operator and is not exposed to the outside; notably, it -// does *not* include any coldata.Batch'es and coldata.Vec's. -type InternalMemoryOperator interface { - colexecbase.Operator - // InternalMemoryUsage reports the internal memory usage (in bytes) of an - // operator. - InternalMemoryUsage() int -} - // resetter is an interface that operators can implement if they can be reset // either for reusing (to keep the already allocated memory) or during tests. type resetter interface { diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index d1b90bafab85..68ce91ebd5fe 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -16,6 +16,7 @@ import ( "fmt" "sort" "strings" + "sync" "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colencoding" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/scrub" @@ -116,6 +118,33 @@ type cTableInfo struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &cTableInfo{} + +var cTableInfoPool = sync.Pool{ + New: func() interface{} { + return &cTableInfo{} + }, +} + +func newCTableInfo() *cTableInfo { + return cTableInfoPool.Get().(*cTableInfo) +} + +// Release implements the execinfra.Releasable interface. +func (c *cTableInfo) Release() { + *c = cTableInfo{ + colIdxMap: colIdxMap{ + vals: c.colIdxMap.vals[:0], + ords: c.colIdxMap.ords[:0], + }, + indexColOrdinals: c.indexColOrdinals[:0], + allIndexColOrdinals: c.allIndexColOrdinals[:0], + extraValColOrdinals: c.extraValColOrdinals[:0], + allExtraValColOrdinals: c.allExtraValColOrdinals[:0], + } + cTableInfoPool.Put(c) +} + // colIdxMap is a "map" that contains the ordinal in cols for each ColumnID // in the table to fetch. This map is used to figure out what index within a // row a particular value-component column goes into. Value-component columns @@ -315,26 +344,31 @@ func (rf *cFetcher) Init( } tableArgs := tables[0] - - m := colIdxMap{ - vals: make(descpb.ColumnIDs, 0, len(tableArgs.ColIdxMap)), - ords: make([]int, 0, len(tableArgs.ColIdxMap)), + table := newCTableInfo() + if cap(table.colIdxMap.vals) < len(tableArgs.ColIdxMap) { + table.colIdxMap.vals = make(descpb.ColumnIDs, 0, len(tableArgs.ColIdxMap)) + table.colIdxMap.ords = make([]int, 0, len(tableArgs.ColIdxMap)) } for k, v := range tableArgs.ColIdxMap { - m.vals = append(m.vals, k) - m.ords = append(m.ords, v) + table.colIdxMap.vals = append(table.colIdxMap.vals, k) + table.colIdxMap.ords = append(table.colIdxMap.ords, v) } - sort.Sort(m) + sort.Sort(table.colIdxMap) colDescriptors := tableArgs.Cols - table := &cTableInfo{ - spans: tableArgs.Spans, - desc: tableArgs.Desc, - colIdxMap: m, - index: tableArgs.Index, - isSecondaryIndex: tableArgs.IsSecondaryIndex, - cols: colDescriptors, - timestampOutputIdx: noOutputColumn, - oidOutputIdx: noOutputColumn, + *table = cTableInfo{ + spans: tableArgs.Spans, + desc: tableArgs.Desc, + colIdxMap: table.colIdxMap, + index: tableArgs.Index, + isSecondaryIndex: tableArgs.IsSecondaryIndex, + cols: colDescriptors, + neededColsList: table.neededColsList[:0], + indexColOrdinals: table.indexColOrdinals[:0], + allIndexColOrdinals: table.allIndexColOrdinals[:0], + extraValColOrdinals: table.extraValColOrdinals[:0], + allExtraValColOrdinals: table.allExtraValColOrdinals[:0], + timestampOutputIdx: noOutputColumn, + oidOutputIdx: noOutputColumn, } rf.typs = make([]*types.T, len(colDescriptors)) @@ -347,7 +381,9 @@ func (rf *cFetcher) Init( var neededCols util.FastIntSet // Scan through the entire columns map to see which columns are // required. - table.neededColsList = make([]int, 0, tableArgs.ValNeededForCol.Len()) + if numNeededCols := tableArgs.ValNeededForCol.Len(); cap(table.neededColsList) < numNeededCols { + table.neededColsList = make([]int, 0, numNeededCols) + } for col, idx := range tableArgs.ColIdxMap { if tableArgs.ValNeededForCol.Contains(idx) { // The idx-th column is required. diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 3c3baa5d570f..3efdddbeff40 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -145,12 +145,13 @@ func NewColBatchScan( ctx context.Context, allocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, spec *execinfrapb.TableReaderSpec, post *execinfrapb.PostProcessSpec, -) (*ColBatchScan, error) { +) (*ColBatchScan, *execinfra.ProcOutputHelper, error) { // NB: we hit this with a zero NodeID (but !ok) with multi-tenancy. if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { - return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") + return nil, nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") } limitHint := execinfra.LimitHint(spec.LimitHint, post) @@ -158,7 +159,7 @@ func NewColBatchScan( returnMutations := spec.Visibility == execinfra.ScanVisibilityPublicAndNotPublic // TODO(ajwerner): The need to construct an Immutable here // indicates that we're probably doing this wrong. Instead we should be - // just seting the ID and Version in the spec or something like that and + // just setting the ID and Version in the spec or something like that and // retrieving the hydrated Immutable from cache. table := tabledesc.NewImmutable(spec.Table) typs := table.ColumnTypesWithMutations(returnMutations) @@ -175,17 +176,16 @@ func NewColBatchScan( } semaCtx := tree.MakeSemaContext() - evalCtx := flowCtx.NewEvalCtx() // Before we can safely use types from the table descriptor, we need to // make sure they are hydrated. In row execution engine it is done during // the processor initialization, but neither ColBatchScan nor cFetcher are // processors, so we need to do the hydration ourselves. resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn) semaCtx.TypeResolver = resolver - if err := resolver.HydrateTypeSlice(evalCtx.Context, typs); err != nil { - return nil, err + if err := resolver.HydrateTypeSlice(ctx, typs); err != nil { + return nil, nil, err } - helper := execinfra.ProcOutputHelper{} + helper := execinfra.NewProcOutputHelper() if err := helper.Init( post, typs, @@ -193,7 +193,7 @@ func NewColBatchScan( evalCtx, nil, /* output */ ); err != nil { - return nil, err + return nil, helper, err } neededColumns := helper.NeededColumns() @@ -201,14 +201,14 @@ func NewColBatchScan( fetcher := cFetcherPool.Get().(*cFetcher) if spec.IsCheck { // cFetchers don't support these checks. - return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") + return nil, helper, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") } if _, _, err := initCRowFetcher( flowCtx.Codec(), allocator, fetcher, table, int(spec.IndexIdx), columnIdxMap, spec.Reverse, neededColumns, spec.Visibility, spec.LockingStrength, spec.LockingWaitPolicy, sysColDescs, ); err != nil { - return nil, err + return nil, helper, err } s := colBatchScanPool.Get().(*ColBatchScan) @@ -227,7 +227,7 @@ func NewColBatchScan( parallelize: spec.Parallelize && limitHint == 0, ResultTypes: typs, } - return s, nil + return s, helper, nil } // initCRowFetcher initializes a row.cFetcher. See initRowFetcher. @@ -276,6 +276,7 @@ func initCRowFetcher( // Release implements the execinfra.Releasable interface. func (s *ColBatchScan) Release() { + s.rf.table.Release() *s.rf = cFetcher{} cFetcherPool.Put(s.rf) *s = ColBatchScan{ diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 92d161933028..f870aa7bf78a 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -468,14 +468,11 @@ type vectorizedFlowCreator struct { leaves []execinfra.OpNode // operatorConcurrency is set if any operators are executed in parallel. operatorConcurrency bool - // streamingMemAccounts contains all memory accounts of the non-buffering - // components in the vectorized flow. - streamingMemAccounts []*mon.BoundAccount // monitors contains all monitors (for both memory and disk usage) of the - // buffering components in the vectorized flow. + // components in the vectorized flow. monitors []*mon.BytesMonitor // accounts contains all monitors (for both memory and disk usage) of the - // buffering components in the vectorized flow. + // components in the vectorized flow. accounts []*mon.BoundAccount // releasables contains all components that should be released back to their // pools during the flow cleanup. @@ -527,7 +524,6 @@ func newVectorizedFlowCreator( exprHelper: creator.exprHelper, typeResolver: typeResolver, leaves: creator.leaves, - streamingMemAccounts: creator.streamingMemAccounts, monitors: creator.monitors, accounts: creator.accounts, releasables: creator.releasables, @@ -538,9 +534,6 @@ func newVectorizedFlowCreator( } func (s *vectorizedFlowCreator) cleanup(ctx context.Context) { - for _, acc := range s.streamingMemAccounts { - acc.Close(ctx) - } for _, acc := range s.accounts { acc.Close(ctx) } @@ -562,7 +555,6 @@ func (s *vectorizedFlowCreator) Release() { vectorizedStatsCollectorsQueue: s.vectorizedStatsCollectorsQueue[:0], exprHelper: s.exprHelper, leaves: s.leaves[:0], - streamingMemAccounts: s.streamingMemAccounts[:0], monitors: s.monitors[:0], accounts: s.accounts[:0], releasables: s.releasables[:0], @@ -611,7 +603,7 @@ func (s *vectorizedFlowCreator) newStreamingMemAccount( flowCtx *execinfra.FlowCtx, ) *mon.BoundAccount { streamingMemAccount := flowCtx.EvalCtx.Mon.MakeBoundAccount() - s.streamingMemAccounts = append(s.streamingMemAccounts, &streamingMemAccount) + s.accounts = append(s.accounts, &streamingMemAccount) return &streamingMemAccount } @@ -1007,7 +999,9 @@ func (s *vectorizedFlowCreator) setupFlow( ) (leaves []execinfra.OpNode, err error) { if vecErr := colexecerror.CatchVectorizedRuntimeError(func() { streamIDToSpecIdx := make(map[execinfrapb.StreamID]int) - factory := coldataext.NewExtendedColumnFactory(flowCtx.NewEvalCtx()) + // The column factory will not change the eval context, so we can use + // the one we have in the flow context, without making a copy. + factory := coldataext.NewExtendedColumnFactory(flowCtx.EvalCtx) // queue is a queue of indices into processorSpecs, for topologically // ordered processing. queue := make([]int, 0, len(processorSpecs)) @@ -1075,6 +1069,7 @@ func (s *vectorizedFlowCreator) setupFlow( DiskQueueCfg: s.diskQueueCfg, FDSemaphore: s.fdSemaphore, ExprHelper: s.exprHelper, + Factory: factory, } var result *colexec.NewColOperatorResult result, err = colbuilder.NewColOperator(ctx, flowCtx, args) @@ -1096,13 +1091,6 @@ func (s *vectorizedFlowCreator) setupFlow( if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics { result.Op = colexec.NewPanicInjector(result.Op) } - // We created a streaming memory account when calling NewColOperator above, - // so there is definitely at least one memory account, and it doesn't - // matter which one we grow. - if err = s.streamingMemAccounts[0].Grow(ctx, int64(result.InternalMemUsage)); err != nil { - err = errors.Wrapf(err, "not enough memory to setup vectorized plan") - return - } metadataSourcesQueue = append(metadataSourcesQueue, result.MetadataSources...) if flowCtx.Cfg != nil && flowCtx.Cfg.TestingKnobs.CheckVectorizedFlowIsClosedCorrectly { for _, closer := range result.ToClose { @@ -1288,8 +1276,8 @@ func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { // IsSupported returns whether a flow specified by spec can be vectorized. func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.FlowSpec) error { - for _, p := range spec.Processors { - if err := colbuilder.IsSupported(mode, &p); err != nil { + for pIdx := range spec.Processors { + if err := colbuilder.IsSupported(mode, &spec.Processors[pIdx]); err != nil { return err } } diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index bd295b96277c..d8dc48d2fb8e 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -49,10 +49,6 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { oneInput := []execinfrapb.InputSyncSpec{ {ColumnTypes: []*types.T{types.Int}}, } - twoInputs := []execinfrapb.InputSyncSpec{ - {ColumnTypes: []*types.T{types.Int}}, - {ColumnTypes: []*types.T{types.Int}}, - } testCases := []struct { desc string @@ -71,16 +67,6 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { ResultTypes: rowenc.OneIntCol, }, }, - { - desc: "MERGE JOIN", - spec: &execinfrapb.ProcessorSpec{ - Input: twoInputs, - Core: execinfrapb.ProcessorCoreUnion{ - MergeJoiner: &execinfrapb.MergeJoinerSpec{}, - }, - ResultTypes: append(twoInputs[0].ColumnTypes, twoInputs[1].ColumnTypes...), - }, - }, } for _, tc := range testCases { @@ -105,11 +91,13 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { StreamingMemAccount: &acc, } args.TestingKnobs.UseStreamingMemAccountForBuffering = true - result, err := colbuilder.NewColOperator(ctx, flowCtx, args) - if err != nil { - t.Fatal(err) + var setupErr error + err := colexecerror.CatchVectorizedRuntimeError(func() { + _, setupErr = colbuilder.NewColOperator(ctx, flowCtx, args) + }) + if setupErr != nil { + t.Fatal(setupErr) } - err = acc.Grow(ctx, int64(result.InternalMemUsage)) if success { require.NoError(t, err, "expected success, found: ", err) } else { diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 81e985565815..07844c269c9f 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -232,11 +232,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { ) _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally) - defer func() { - for _, memAcc := range vfc.streamingMemAccounts { - memAcc.Close(ctx) - } - }() + defer vfc.cleanup(ctx) require.NoError(t, err) // Verify that an outbox was actually created. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 46c30b893f81..7c2261856d82 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -26,6 +26,9 @@ import ( "github.com/cockroachdb/errors" ) +// TODO(yuzefovich): audit all Operators to make sure that all static +// (internal) memory is accounted for. + // Allocator is a memory management tool for vectorized components. It provides // new batches (and appends to existing ones) within a fixed memory budget. If // the budget is exceeded, it will panic with an error. diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 5d89081db749..b421a9eec7d4 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -13,6 +13,7 @@ package execinfra import ( "context" "math" + "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -58,12 +59,13 @@ type ProcOutputHelper struct { // post-processed row directly. output RowReceiver RowAlloc rowenc.EncDatumRowAlloc - - filter *execinfrapb.ExprHelper - // renderExprs has length > 0 if we have a rendering. Only one of renderExprs + // Filter is an optional filter that determines whether a single row is + // output or not. + Filter *execinfrapb.ExprHelper + // RenderExprs has length > 0 if we have a rendering. Only one of RenderExprs // and outputCols can be set. - renderExprs []execinfrapb.ExprHelper - // outputCols is non-nil if we have a projection. Only one of renderExprs and + RenderExprs []execinfrapb.ExprHelper + // outputCols is non-nil if we have a projection. Only one of RenderExprs and // outputCols can be set. Note that 0-length projections are possible, in // which case outputCols will be 0-length but non-nil. outputCols []uint32 @@ -73,7 +75,7 @@ type ProcOutputHelper struct { // OutputTypes is the schema of the rows produced by the processor after // post-processing (i.e. the rows that are pushed through a router). // - // If renderExprs is set, these types correspond to the types of those + // If RenderExprs is set, these types correspond to the types of those // expressions. // If outputCols is set, these types correspond to the types of // those columns. @@ -89,10 +91,30 @@ type ProcOutputHelper struct { rowIdx uint64 } +var procOutputHelperPool = sync.Pool{ + New: func() interface{} { + return &ProcOutputHelper{} + }, +} + +// NewProcOutputHelper returns a new ProcOutputHelper. +func NewProcOutputHelper() *ProcOutputHelper { + return procOutputHelperPool.Get().(*ProcOutputHelper) +} + +// Release is part of the Releasable interface. +func (h *ProcOutputHelper) Release() { + *h = ProcOutputHelper{ + RenderExprs: h.RenderExprs[:0], + OutputTypes: h.OutputTypes[:0], + } + procOutputHelperPool.Put(h) +} + // Reset resets this ProcOutputHelper, retaining allocated memory in its slices. func (h *ProcOutputHelper) Reset() { *h = ProcOutputHelper{ - renderExprs: h.renderExprs[:0], + RenderExprs: h.RenderExprs[:0], OutputTypes: h.OutputTypes[:0], } } @@ -118,8 +140,8 @@ func (h *ProcOutputHelper) Init( h.output = output h.numInternalCols = len(coreOutputTypes) if post.Filter != (execinfrapb.Expression{}) { - h.filter = &execinfrapb.ExprHelper{} - if err := h.filter.Init(post.Filter, coreOutputTypes, semaCtx, evalCtx); err != nil { + h.Filter = &execinfrapb.ExprHelper{} + if err := h.Filter.Init(post.Filter, coreOutputTypes, semaCtx, evalCtx); err != nil { return err } } @@ -144,10 +166,10 @@ func (h *ProcOutputHelper) Init( h.OutputTypes[i] = coreOutputTypes[c] } } else if nRenders := len(post.RenderExprs); nRenders > 0 { - if cap(h.renderExprs) >= nRenders { - h.renderExprs = h.renderExprs[:nRenders] + if cap(h.RenderExprs) >= nRenders { + h.RenderExprs = h.RenderExprs[:nRenders] } else { - h.renderExprs = make([]execinfrapb.ExprHelper, nRenders) + h.RenderExprs = make([]execinfrapb.ExprHelper, nRenders) } if cap(h.OutputTypes) >= nRenders { h.OutputTypes = h.OutputTypes[:nRenders] @@ -155,11 +177,11 @@ func (h *ProcOutputHelper) Init( h.OutputTypes = make([]*types.T, nRenders) } for i, expr := range post.RenderExprs { - h.renderExprs[i] = execinfrapb.ExprHelper{} - if err := h.renderExprs[i].Init(expr, coreOutputTypes, semaCtx, evalCtx); err != nil { + h.RenderExprs[i] = execinfrapb.ExprHelper{} + if err := h.RenderExprs[i].Init(expr, coreOutputTypes, semaCtx, evalCtx); err != nil { return err } - h.OutputTypes[i] = h.renderExprs[i].Expr.ResolvedType() + h.OutputTypes[i] = h.RenderExprs[i].Expr.ResolvedType() } } else { // No rendering or projection. @@ -170,7 +192,7 @@ func (h *ProcOutputHelper) Init( } copy(h.OutputTypes, coreOutputTypes) } - if h.outputCols != nil || len(h.renderExprs) > 0 { + if h.outputCols != nil || len(h.RenderExprs) > 0 { // We're rendering or projecting, so allocate an output row. h.outputRow = h.RowAlloc.AllocRow(len(h.OutputTypes)) } @@ -188,7 +210,7 @@ func (h *ProcOutputHelper) Init( // NeededColumns calculates the set of internal processor columns that are // actually used by the post-processing stage. func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) { - if h.outputCols == nil && len(h.renderExprs) == 0 { + if h.outputCols == nil && len(h.RenderExprs) == 0 { // No projection or rendering; all columns are needed. colIdxs.AddRange(0, h.numInternalCols-1) return colIdxs @@ -201,14 +223,14 @@ func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) { for i := 0; i < h.numInternalCols; i++ { // See if filter requires this column. - if h.filter != nil && h.filter.Vars.IndexedVarUsed(i) { + if h.Filter != nil && h.Filter.Vars.IndexedVarUsed(i) { colIdxs.Add(i) continue } // See if render expressions require this column. - for j := range h.renderExprs { - if h.renderExprs[j].Vars.IndexedVarUsed(i) { + for j := range h.RenderExprs { + if h.RenderExprs[j].Vars.IndexedVarUsed(i) { colIdxs.Add(i) break } @@ -282,15 +304,15 @@ func (h *ProcOutputHelper) ProcessRow( return nil, false, nil } - if h.filter != nil { + if h.Filter != nil { // Filtering. - passes, err := h.filter.EvalFilter(row) + passes, err := h.Filter.EvalFilter(row) if err != nil { return nil, false, err } if !passes { if log.V(4) { - log.Infof(ctx, "filtered out row %s", row.String(h.filter.Types)) + log.Infof(ctx, "filtered out row %s", row.String(h.Filter.Types)) } return nil, true, nil } @@ -301,10 +323,10 @@ func (h *ProcOutputHelper) ProcessRow( return nil, true, nil } - if len(h.renderExprs) > 0 { + if len(h.RenderExprs) > 0 { // Rendering. - for i := range h.renderExprs { - datum, err := h.renderExprs[i].Eval(row) + for i := range h.RenderExprs { + datum, err := h.RenderExprs[i].Eval(row) if err != nil { return nil, false, err }