From 907664199192c4e8415619662ce4bdefe607f9b8 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 2 Nov 2020 15:38:45 -1000 Subject: [PATCH 1/4] colexec: reuse the same eval context for objects of the same proc spec In the row-execution processors we create a copy of the eval context for each processor core because the processor might modify it. In the vectorized engine we took it to the extreme - we're currently creating a copy for every usage of the eval context. That is unnecessary, and this commit makes the vectorized operator creator to be more like the row engine - by reusing the same eval context for all operators that are created for a single processor spec. This commit also switches to using the "original" eval context that is part of the flow context when instantiating a ColumnFactory and when initializing materializers because those two components don't modify the eval context. Release note: None --- pkg/sql/colexec/colbuilder/execplan.go | 20 +++++++++++--------- pkg/sql/colexec/materializer.go | 5 ++++- pkg/sql/colfetcher/colbatch_scan.go | 2 +- pkg/sql/colflow/vectorized_flow.go | 4 +++- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index ce8eb765be28..bc9ef9e1dbb2 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -631,7 +631,8 @@ func NewColOperator( }() spec := args.Spec inputs := args.Inputs - factory := coldataext.NewExtendedColumnFactory(flowCtx.NewEvalCtx()) + evalCtx := flowCtx.NewEvalCtx() + factory := coldataext.NewExtendedColumnFactory(evalCtx) streamingMemAccount := args.StreamingMemAccount streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory) useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering @@ -697,7 +698,7 @@ func NewColOperator( if err := checkNumIn(inputs, 0); err != nil { return r, err } - scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, core.TableReader, post) + scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post) if err != nil { return r, err } @@ -752,7 +753,6 @@ func NewColOperator( } inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) copy(inputTypes, spec.Input[0].ColumnTypes) - evalCtx := flowCtx.NewEvalCtx() var constructors []execinfrapb.AggregateConstructor var constArguments []tree.Datums semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn) @@ -933,7 +933,7 @@ func NewColOperator( if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin { if err = result.planAndMaybeWrapOnExprAsFilter( - ctx, flowCtx, args, core.HashJoiner.OnExpr, factory, + ctx, flowCtx, evalCtx, args, core.HashJoiner.OnExpr, factory, ); err != nil { return r, err } @@ -993,7 +993,7 @@ func NewColOperator( if onExpr != nil { if err = result.planAndMaybeWrapOnExprAsFilter( - ctx, flowCtx, args, *onExpr, factory, + ctx, flowCtx, evalCtx, args, *onExpr, factory, ); err != nil { return r, err } @@ -1150,7 +1150,7 @@ func NewColOperator( Op: result.Op, ColumnTypes: result.ColumnTypes, } - err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory) + err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory) if err != nil { if log.V(2) { log.Infof( @@ -1233,6 +1233,7 @@ func NewColOperator( func (r opResult) planAndMaybeWrapOnExprAsFilter( ctx context.Context, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, args *colexec.NewColOperatorArgs, onExpr execinfrapb.Expression, factory coldata.ColumnFactory, @@ -1247,7 +1248,7 @@ func (r opResult) planAndMaybeWrapOnExprAsFilter( ColumnTypes: r.ColumnTypes, } if err := ppr.planFilterExpr( - ctx, flowCtx, flowCtx.NewEvalCtx(), onExpr, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, ); err != nil { // ON expression planning failed. Fall back to planning the filter // using row execution. @@ -1298,13 +1299,14 @@ func (r opResult) wrapPostProcessSpec( func (r *postProcessResult) planPostProcessSpec( ctx context.Context, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, args *colexec.NewColOperatorArgs, post *execinfrapb.PostProcessSpec, factory coldata.ColumnFactory, ) error { if !post.Filter.Empty() { if err := r.planFilterExpr( - ctx, flowCtx, flowCtx.NewEvalCtx(), post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, ); err != nil { return err } @@ -1326,7 +1328,7 @@ func (r *postProcessResult) planPostProcessSpec( } var outputIdx int r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators( - ctx, flowCtx.NewEvalCtx(), expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, + ctx, evalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize render expression %q", expr) diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 9d2d070bd255..292f5064556b 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -180,13 +180,16 @@ func NewMaterializer( closers: toClose, } - if err := m.ProcessorBase.Init( + if err := m.ProcessorBase.InitWithEvalCtx( m, // input must have handled any post-processing itself, so we pass in // an empty post-processing spec. &execinfrapb.PostProcessSpec{}, typs, flowCtx, + // Materializer doesn't modify the eval context, so it is safe to reuse + // the one from the flow context. + flowCtx.EvalCtx, processorID, output, nil, /* memMonitor */ diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 3c3baa5d570f..5d8380dfd9dd 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -145,6 +145,7 @@ func NewColBatchScan( ctx context.Context, allocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, spec *execinfrapb.TableReaderSpec, post *execinfrapb.PostProcessSpec, ) (*ColBatchScan, error) { @@ -175,7 +176,6 @@ func NewColBatchScan( } semaCtx := tree.MakeSemaContext() - evalCtx := flowCtx.NewEvalCtx() // Before we can safely use types from the table descriptor, we need to // make sure they are hydrated. In row execution engine it is done during // the processor initialization, but neither ColBatchScan nor cFetcher are diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 92d161933028..a2d3344581ec 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -1007,7 +1007,9 @@ func (s *vectorizedFlowCreator) setupFlow( ) (leaves []execinfra.OpNode, err error) { if vecErr := colexecerror.CatchVectorizedRuntimeError(func() { streamIDToSpecIdx := make(map[execinfrapb.StreamID]int) - factory := coldataext.NewExtendedColumnFactory(flowCtx.NewEvalCtx()) + // The column factory will not change the eval context, so we can use + // the one we have in the flow context, without making a copy. + factory := coldataext.NewExtendedColumnFactory(flowCtx.EvalCtx) // queue is a queue of indices into processorSpecs, for topologically // ordered processing. queue := make([]int, 0, len(processorSpecs)) From a266317bf17aa0534d580f3f4209f3825663871f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 6 Nov 2020 10:31:56 -1000 Subject: [PATCH 2/4] colconv,colfetcher: pool allocations of converters and cTableInfos This commit pools the allocations of the converters when they are used in the materializers as well as `cTableInfo` structs used by the ColBatchScan. Additionally, this commit reuses the same ColumnFactory when setting up the whole flow and removes a copy of a processor spec when checking whether it is supported. Another notable change is reusing the same global empty post-processing spec when creating the materializers. It is thread-safe because the object is never modified. Release note: None --- pkg/sql/colconv/vec_to_datum.eg.go | 51 +++++++++++++------ pkg/sql/colconv/vec_to_datum_tmpl.go | 52 +++++++++++++------ pkg/sql/colexec/colbuilder/execplan.go | 5 +- pkg/sql/colexec/materializer.go | 10 +++- pkg/sql/colexec/op_creation.go | 2 + pkg/sql/colfetcher/cfetcher.go | 70 +++++++++++++++++++------- pkg/sql/colfetcher/colbatch_scan.go | 1 + pkg/sql/colflow/vectorized_flow.go | 5 +- 8 files changed, 145 insertions(+), 51 deletions(-) diff --git a/pkg/sql/colconv/vec_to_datum.eg.go b/pkg/sql/colconv/vec_to_datum.eg.go index dbe66ad49e65..e652a145f132 100644 --- a/pkg/sql/colconv/vec_to_datum.eg.go +++ b/pkg/sql/colconv/vec_to_datum.eg.go @@ -11,11 +11,13 @@ package colconv import ( "math/big" + "sync" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -46,14 +48,33 @@ type VecToDatumConverter struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &VecToDatumConverter{} + +var vecToDatumConverterPool = sync.Pool{ + New: func() interface{} { + return &VecToDatumConverter{} + }, +} + // NewVecToDatumConverter creates a new VecToDatumConverter. // - batchWidth determines the width of the batches that it will be converting. // - vecIdxsToConvert determines which vectors need to be converted. func NewVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *VecToDatumConverter { - return &VecToDatumConverter{ - convertedVecs: make([]tree.Datums, batchWidth), - vecIdxsToConvert: vecIdxsToConvert, + c := vecToDatumConverterPool.Get().(*VecToDatumConverter) + if cap(c.convertedVecs) < batchWidth { + c.convertedVecs = make([]tree.Datums, batchWidth) + } else { + c.convertedVecs = c.convertedVecs[:batchWidth] } + c.vecIdxsToConvert = vecIdxsToConvert + return c +} + +// Release is part of the execinfra.Releasable interface. +func (c *VecToDatumConverter) Release() { + c.convertedVecs = c.convertedVecs[:0] + c.vecIdxsToConvert = nil + vecToDatumConverterPool.Put(c) } // ConvertBatchAndDeselect converts the selected vectors from the batch while @@ -75,17 +96,17 @@ func (c *VecToDatumConverter) ConvertBatchAndDeselect(batch coldata.Batch) { return } // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < batchLength { c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] } + } + if c.da.AllocSize < batchLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = batchLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] - } } sel := batch.Selection() vecs := batch.ColVecs() @@ -129,17 +150,17 @@ func (c *VecToDatumConverter) ConvertVecs(vecs []coldata.Vec, inputLen int, sel // rely on the fact that selection vectors are increasing sequences. requiredLength = sel[inputLen-1] + 1 } - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < requiredLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < requiredLength { c.convertedVecs[vecIdx] = make([]tree.Datum, requiredLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } + } + if c.da.AllocSize < requiredLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = requiredLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] - } } for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( diff --git a/pkg/sql/colconv/vec_to_datum_tmpl.go b/pkg/sql/colconv/vec_to_datum_tmpl.go index fc743816abc6..e73a87d7f2dc 100644 --- a/pkg/sql/colconv/vec_to_datum_tmpl.go +++ b/pkg/sql/colconv/vec_to_datum_tmpl.go @@ -20,7 +20,10 @@ package colconv import ( + "sync" + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -49,14 +52,33 @@ type VecToDatumConverter struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &VecToDatumConverter{} + +var vecToDatumConverterPool = sync.Pool{ + New: func() interface{} { + return &VecToDatumConverter{} + }, +} + // NewVecToDatumConverter creates a new VecToDatumConverter. // - batchWidth determines the width of the batches that it will be converting. // - vecIdxsToConvert determines which vectors need to be converted. func NewVecToDatumConverter(batchWidth int, vecIdxsToConvert []int) *VecToDatumConverter { - return &VecToDatumConverter{ - convertedVecs: make([]tree.Datums, batchWidth), - vecIdxsToConvert: vecIdxsToConvert, + c := vecToDatumConverterPool.Get().(*VecToDatumConverter) + if cap(c.convertedVecs) < batchWidth { + c.convertedVecs = make([]tree.Datums, batchWidth) + } else { + c.convertedVecs = c.convertedVecs[:batchWidth] } + c.vecIdxsToConvert = vecIdxsToConvert + return c +} + +// Release is part of the execinfra.Releasable interface. +func (c *VecToDatumConverter) Release() { + c.convertedVecs = c.convertedVecs[:0] + c.vecIdxsToConvert = nil + vecToDatumConverterPool.Put(c) } // ConvertBatchAndDeselect converts the selected vectors from the batch while @@ -78,17 +100,17 @@ func (c *VecToDatumConverter) ConvertBatchAndDeselect(batch coldata.Batch) { return } // Ensure that convertedVecs are of sufficient length. - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < batchLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < batchLength { c.convertedVecs[vecIdx] = make([]tree.Datum, batchLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] } + } + if c.da.AllocSize < batchLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = batchLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:batchLength] - } } sel := batch.Selection() vecs := batch.ColVecs() @@ -132,17 +154,17 @@ func (c *VecToDatumConverter) ConvertVecs(vecs []coldata.Vec, inputLen int, sel // rely on the fact that selection vectors are increasing sequences. requiredLength = sel[inputLen-1] + 1 } - if cap(c.convertedVecs[c.vecIdxsToConvert[0]]) < requiredLength { - for _, vecIdx := range c.vecIdxsToConvert { + for _, vecIdx := range c.vecIdxsToConvert { + if cap(c.convertedVecs[vecIdx]) < requiredLength { c.convertedVecs[vecIdx] = make([]tree.Datum, requiredLength) + } else { + c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] } + } + if c.da.AllocSize < requiredLength { // Adjust the datum alloc according to the length of the batch since // this batch is the longest we've seen so far. c.da.AllocSize = requiredLength - } else { - for _, vecIdx := range c.vecIdxsToConvert { - c.convertedVecs[vecIdx] = c.convertedVecs[vecIdx][:requiredLength] - } } for _, vecIdx := range c.vecIdxsToConvert { ColVecToDatum( diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index bc9ef9e1dbb2..617503e05c9e 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -632,7 +632,10 @@ func NewColOperator( spec := args.Spec inputs := args.Inputs evalCtx := flowCtx.NewEvalCtx() - factory := coldataext.NewExtendedColumnFactory(evalCtx) + factory := args.Factory + if factory == nil { + factory = coldataext.NewExtendedColumnFactory(evalCtx) + } streamingMemAccount := args.StreamingMemAccount streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory) useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 292f5064556b..04b386085341 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -141,6 +141,13 @@ var materializerPool = sync.Pool{ }, } +// materializerEmptyPostProcessSpec is the spec used to initialize the +// materializer. Currently, we assume that the input to the materializer fully +// handles any post-processing, so we always use the empty spec. +// Note that this variable is never modified once initialized (neither is the +// post-processing spec object itself), so it is thread-safe. +var materializerEmptyPostProcessSpec = &execinfrapb.PostProcessSpec{} + // NewMaterializer creates a new Materializer processor which processes the // columnar data coming from input to return it as rows. // Arguments: @@ -184,7 +191,7 @@ func NewMaterializer( m, // input must have handled any post-processing itself, so we pass in // an empty post-processing spec. - &execinfrapb.PostProcessSpec{}, + materializerEmptyPostProcessSpec, typs, flowCtx, // Materializer doesn't modify the eval context, so it is safe to reuse @@ -312,6 +319,7 @@ func (m *Materializer) ConsumerClosed() { func (m *Materializer) Release() { m.drainHelper.Release() m.ProcessorBase.Reset() + m.converter.Release() *m = Materializer{ // We're keeping the reference to the same ProcessorBase since it // allows us to reuse some of the slices as well as ProcOutputHelper diff --git a/pkg/sql/colexec/op_creation.go b/pkg/sql/colexec/op_creation.go index 5ad2503d7699..d4f969f1b4b1 100644 --- a/pkg/sql/colexec/op_creation.go +++ b/pkg/sql/colexec/op_creation.go @@ -14,6 +14,7 @@ import ( "context" "sync" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -39,6 +40,7 @@ type NewColOperatorArgs struct { DiskQueueCfg colcontainer.DiskQueueCfg FDSemaphore semaphore.Semaphore ExprHelper *ExprHelper + Factory coldata.ColumnFactory TestingKnobs struct { // UseStreamingMemAccountForBuffering specifies whether to use // StreamingMemAccount when creating buffering operators and should only be diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index d1b90bafab85..68ce91ebd5fe 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -16,6 +16,7 @@ import ( "fmt" "sort" "strings" + "sync" "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colencoding" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/scrub" @@ -116,6 +118,33 @@ type cTableInfo struct { da rowenc.DatumAlloc } +var _ execinfra.Releasable = &cTableInfo{} + +var cTableInfoPool = sync.Pool{ + New: func() interface{} { + return &cTableInfo{} + }, +} + +func newCTableInfo() *cTableInfo { + return cTableInfoPool.Get().(*cTableInfo) +} + +// Release implements the execinfra.Releasable interface. +func (c *cTableInfo) Release() { + *c = cTableInfo{ + colIdxMap: colIdxMap{ + vals: c.colIdxMap.vals[:0], + ords: c.colIdxMap.ords[:0], + }, + indexColOrdinals: c.indexColOrdinals[:0], + allIndexColOrdinals: c.allIndexColOrdinals[:0], + extraValColOrdinals: c.extraValColOrdinals[:0], + allExtraValColOrdinals: c.allExtraValColOrdinals[:0], + } + cTableInfoPool.Put(c) +} + // colIdxMap is a "map" that contains the ordinal in cols for each ColumnID // in the table to fetch. This map is used to figure out what index within a // row a particular value-component column goes into. Value-component columns @@ -315,26 +344,31 @@ func (rf *cFetcher) Init( } tableArgs := tables[0] - - m := colIdxMap{ - vals: make(descpb.ColumnIDs, 0, len(tableArgs.ColIdxMap)), - ords: make([]int, 0, len(tableArgs.ColIdxMap)), + table := newCTableInfo() + if cap(table.colIdxMap.vals) < len(tableArgs.ColIdxMap) { + table.colIdxMap.vals = make(descpb.ColumnIDs, 0, len(tableArgs.ColIdxMap)) + table.colIdxMap.ords = make([]int, 0, len(tableArgs.ColIdxMap)) } for k, v := range tableArgs.ColIdxMap { - m.vals = append(m.vals, k) - m.ords = append(m.ords, v) + table.colIdxMap.vals = append(table.colIdxMap.vals, k) + table.colIdxMap.ords = append(table.colIdxMap.ords, v) } - sort.Sort(m) + sort.Sort(table.colIdxMap) colDescriptors := tableArgs.Cols - table := &cTableInfo{ - spans: tableArgs.Spans, - desc: tableArgs.Desc, - colIdxMap: m, - index: tableArgs.Index, - isSecondaryIndex: tableArgs.IsSecondaryIndex, - cols: colDescriptors, - timestampOutputIdx: noOutputColumn, - oidOutputIdx: noOutputColumn, + *table = cTableInfo{ + spans: tableArgs.Spans, + desc: tableArgs.Desc, + colIdxMap: table.colIdxMap, + index: tableArgs.Index, + isSecondaryIndex: tableArgs.IsSecondaryIndex, + cols: colDescriptors, + neededColsList: table.neededColsList[:0], + indexColOrdinals: table.indexColOrdinals[:0], + allIndexColOrdinals: table.allIndexColOrdinals[:0], + extraValColOrdinals: table.extraValColOrdinals[:0], + allExtraValColOrdinals: table.allExtraValColOrdinals[:0], + timestampOutputIdx: noOutputColumn, + oidOutputIdx: noOutputColumn, } rf.typs = make([]*types.T, len(colDescriptors)) @@ -347,7 +381,9 @@ func (rf *cFetcher) Init( var neededCols util.FastIntSet // Scan through the entire columns map to see which columns are // required. - table.neededColsList = make([]int, 0, tableArgs.ValNeededForCol.Len()) + if numNeededCols := tableArgs.ValNeededForCol.Len(); cap(table.neededColsList) < numNeededCols { + table.neededColsList = make([]int, 0, numNeededCols) + } for col, idx := range tableArgs.ColIdxMap { if tableArgs.ValNeededForCol.Contains(idx) { // The idx-th column is required. diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 5d8380dfd9dd..fad6dca0c5fb 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -276,6 +276,7 @@ func initCRowFetcher( // Release implements the execinfra.Releasable interface. func (s *ColBatchScan) Release() { + s.rf.table.Release() *s.rf = cFetcher{} cFetcherPool.Put(s.rf) *s = ColBatchScan{ diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index a2d3344581ec..d00a5a6f582e 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -1077,6 +1077,7 @@ func (s *vectorizedFlowCreator) setupFlow( DiskQueueCfg: s.diskQueueCfg, FDSemaphore: s.fdSemaphore, ExprHelper: s.exprHelper, + Factory: factory, } var result *colexec.NewColOperatorResult result, err = colbuilder.NewColOperator(ctx, flowCtx, args) @@ -1290,8 +1291,8 @@ func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { // IsSupported returns whether a flow specified by spec can be vectorized. func IsSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.FlowSpec) error { - for _, p := range spec.Processors { - if err := colbuilder.IsSupported(mode, &p); err != nil { + for pIdx := range spec.Processors { + if err := colbuilder.IsSupported(mode, &spec.Processors[pIdx]); err != nil { return err } } From 8ac12aea9e36d12a93337fb01178477cfae3000f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 5 Nov 2020 14:55:50 -1000 Subject: [PATCH 3/4] colbuilder: remove redundant expr deserialization after scan Whenever we're creating a `ColBatchScan`, we're initializing a `ProcOutputHelper` in order to obtain the set of needed columns (based on the schema of the table and the post-processing spec). As part of that initialization we're obtaining well-typed expression for everything in the post-processing spec. However, later when actually planning the post-processing in the vectorized engine we deserialize the expressions again. This is redundant, and the code has been adjusted to reuse the same helper for both scan operator creation and post-processing planning. Additionally, those `ProcOutputHelper` objects are now pooled. Release note: None --- pkg/sql/colexec/colbuilder/execplan.go | 57 ++++++++++++++------ pkg/sql/colfetcher/colbatch_scan.go | 20 +++---- pkg/sql/execinfra/processorsbase.go | 74 +++++++++++++++++--------- 3 files changed, 99 insertions(+), 52 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 617503e05c9e..332bc770a37f 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -649,6 +649,7 @@ func NewColOperator( core := &spec.Core post := &spec.Post + var procOutputHelper *execinfra.ProcOutputHelper // resultPreSpecPlanningStateShallowCopy is a shallow copy of the result // before any specs are planned. Used if there is a need to backtrack. @@ -701,14 +702,16 @@ func NewColOperator( if err := checkNumIn(inputs, 0); err != nil { return r, err } - scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post) + scanOp, helper, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post) if err != nil { return r, err } result.Op = scanOp result.IOReader = scanOp result.MetadataSources = append(result.MetadataSources, scanOp) + result.Releasables = append(result.Releasables, helper) result.Releasables = append(result.Releasables, scanOp) + procOutputHelper = helper // colBatchScan is wrapped with a cancel checker below, so we need to // log its creation separately. if log.V(1) { @@ -758,7 +761,7 @@ func NewColOperator( copy(inputTypes, spec.Input[0].ColumnTypes) var constructors []execinfrapb.AggregateConstructor var constArguments []tree.Datums - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) constructors, constArguments, result.ColumnTypes, err = colexecagg.ProcessAggregations( evalCtx, semaCtx, aggSpec.Aggregations, inputTypes, ) @@ -1153,7 +1156,7 @@ func NewColOperator( Op: result.Op, ColumnTypes: result.ColumnTypes, } - err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory) + err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory, procOutputHelper) if err != nil { if log.V(2) { log.Infof( @@ -1251,7 +1254,7 @@ func (r opResult) planAndMaybeWrapOnExprAsFilter( ColumnTypes: r.ColumnTypes, } if err := ppr.planFilterExpr( - ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, nil, /* procOutputHelper */ ); err != nil { // ON expression planning failed. Fall back to planning the filter // using row execution. @@ -1298,7 +1301,9 @@ func (r opResult) wrapPostProcessSpec( } // planPostProcessSpec plans the post processing stage specified in post on top -// of r.Op. +// of r.Op. It takes in an optional procOutputHelper which has already been +// initialized with post meaning that it already contains well-typed expressions +// which allows us to avoid redundant deserialization. func (r *postProcessResult) planPostProcessSpec( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1306,10 +1311,11 @@ func (r *postProcessResult) planPostProcessSpec( args *colexec.NewColOperatorArgs, post *execinfrapb.PostProcessSpec, factory coldata.ColumnFactory, + procOutputHelper *execinfra.ProcOutputHelper, ) error { if !post.Filter.Empty() { if err := r.planFilterExpr( - ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, + ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, procOutputHelper, ); err != nil { return err } @@ -1321,13 +1327,19 @@ func (r *postProcessResult) planPostProcessSpec( if log.V(2) { log.Infof(ctx, "planning render expressions %+v", post.RenderExprs) } - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) var renderedCols []uint32 - for _, renderExpr := range post.RenderExprs { + for renderIdx, renderExpr := range post.RenderExprs { var renderInternalMem int - expr, err := args.ExprHelper.ProcessExpr(renderExpr, semaCtx, flowCtx.EvalCtx, r.ColumnTypes) - if err != nil { - return err + var expr tree.TypedExpr + var err error + if procOutputHelper != nil { + expr = procOutputHelper.RenderExprs[renderIdx].Expr + } else { + expr, err = args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes) + if err != nil { + return err + } } var outputIdx int r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators( @@ -1454,6 +1466,10 @@ func (r opResult) updateWithPostProcessResult(ppr postProcessResult) { r.InternalMemUsage += ppr.InternalMemUsage } +// planFilterExpr creates all operators to implement filter expression. It takes +// in an optional procOutputHelper which has already been initialized with post +// meaning that it already contains well-typed expressions which allows us to +// avoid redundant deserialization. func (r *postProcessResult) planFilterExpr( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1462,12 +1478,21 @@ func (r *postProcessResult) planFilterExpr( acc *mon.BoundAccount, factory coldata.ColumnFactory, helper *colexec.ExprHelper, + procOutputHelper *execinfra.ProcOutputHelper, ) error { - var selectionInternalMem int - semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) - expr, err := helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes) - if err != nil { - return err + var ( + selectionInternalMem int + expr tree.TypedExpr + err error + ) + if procOutputHelper != nil { + expr = procOutputHelper.Filter.Expr + } else { + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) + expr, err = helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes) + if err != nil { + return err + } } if expr == tree.DNull { // The filter expression is tree.DNull meaning that it is always false, so diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index fad6dca0c5fb..3efdddbeff40 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -148,10 +148,10 @@ func NewColBatchScan( evalCtx *tree.EvalContext, spec *execinfrapb.TableReaderSpec, post *execinfrapb.PostProcessSpec, -) (*ColBatchScan, error) { +) (*ColBatchScan, *execinfra.ProcOutputHelper, error) { // NB: we hit this with a zero NodeID (but !ok) with multi-tenancy. if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { - return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") + return nil, nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") } limitHint := execinfra.LimitHint(spec.LimitHint, post) @@ -159,7 +159,7 @@ func NewColBatchScan( returnMutations := spec.Visibility == execinfra.ScanVisibilityPublicAndNotPublic // TODO(ajwerner): The need to construct an Immutable here // indicates that we're probably doing this wrong. Instead we should be - // just seting the ID and Version in the spec or something like that and + // just setting the ID and Version in the spec or something like that and // retrieving the hydrated Immutable from cache. table := tabledesc.NewImmutable(spec.Table) typs := table.ColumnTypesWithMutations(returnMutations) @@ -182,10 +182,10 @@ func NewColBatchScan( // processors, so we need to do the hydration ourselves. resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn) semaCtx.TypeResolver = resolver - if err := resolver.HydrateTypeSlice(evalCtx.Context, typs); err != nil { - return nil, err + if err := resolver.HydrateTypeSlice(ctx, typs); err != nil { + return nil, nil, err } - helper := execinfra.ProcOutputHelper{} + helper := execinfra.NewProcOutputHelper() if err := helper.Init( post, typs, @@ -193,7 +193,7 @@ func NewColBatchScan( evalCtx, nil, /* output */ ); err != nil { - return nil, err + return nil, helper, err } neededColumns := helper.NeededColumns() @@ -201,14 +201,14 @@ func NewColBatchScan( fetcher := cFetcherPool.Get().(*cFetcher) if spec.IsCheck { // cFetchers don't support these checks. - return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") + return nil, helper, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") } if _, _, err := initCRowFetcher( flowCtx.Codec(), allocator, fetcher, table, int(spec.IndexIdx), columnIdxMap, spec.Reverse, neededColumns, spec.Visibility, spec.LockingStrength, spec.LockingWaitPolicy, sysColDescs, ); err != nil { - return nil, err + return nil, helper, err } s := colBatchScanPool.Get().(*ColBatchScan) @@ -227,7 +227,7 @@ func NewColBatchScan( parallelize: spec.Parallelize && limitHint == 0, ResultTypes: typs, } - return s, nil + return s, helper, nil } // initCRowFetcher initializes a row.cFetcher. See initRowFetcher. diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 5d89081db749..b421a9eec7d4 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -13,6 +13,7 @@ package execinfra import ( "context" "math" + "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -58,12 +59,13 @@ type ProcOutputHelper struct { // post-processed row directly. output RowReceiver RowAlloc rowenc.EncDatumRowAlloc - - filter *execinfrapb.ExprHelper - // renderExprs has length > 0 if we have a rendering. Only one of renderExprs + // Filter is an optional filter that determines whether a single row is + // output or not. + Filter *execinfrapb.ExprHelper + // RenderExprs has length > 0 if we have a rendering. Only one of RenderExprs // and outputCols can be set. - renderExprs []execinfrapb.ExprHelper - // outputCols is non-nil if we have a projection. Only one of renderExprs and + RenderExprs []execinfrapb.ExprHelper + // outputCols is non-nil if we have a projection. Only one of RenderExprs and // outputCols can be set. Note that 0-length projections are possible, in // which case outputCols will be 0-length but non-nil. outputCols []uint32 @@ -73,7 +75,7 @@ type ProcOutputHelper struct { // OutputTypes is the schema of the rows produced by the processor after // post-processing (i.e. the rows that are pushed through a router). // - // If renderExprs is set, these types correspond to the types of those + // If RenderExprs is set, these types correspond to the types of those // expressions. // If outputCols is set, these types correspond to the types of // those columns. @@ -89,10 +91,30 @@ type ProcOutputHelper struct { rowIdx uint64 } +var procOutputHelperPool = sync.Pool{ + New: func() interface{} { + return &ProcOutputHelper{} + }, +} + +// NewProcOutputHelper returns a new ProcOutputHelper. +func NewProcOutputHelper() *ProcOutputHelper { + return procOutputHelperPool.Get().(*ProcOutputHelper) +} + +// Release is part of the Releasable interface. +func (h *ProcOutputHelper) Release() { + *h = ProcOutputHelper{ + RenderExprs: h.RenderExprs[:0], + OutputTypes: h.OutputTypes[:0], + } + procOutputHelperPool.Put(h) +} + // Reset resets this ProcOutputHelper, retaining allocated memory in its slices. func (h *ProcOutputHelper) Reset() { *h = ProcOutputHelper{ - renderExprs: h.renderExprs[:0], + RenderExprs: h.RenderExprs[:0], OutputTypes: h.OutputTypes[:0], } } @@ -118,8 +140,8 @@ func (h *ProcOutputHelper) Init( h.output = output h.numInternalCols = len(coreOutputTypes) if post.Filter != (execinfrapb.Expression{}) { - h.filter = &execinfrapb.ExprHelper{} - if err := h.filter.Init(post.Filter, coreOutputTypes, semaCtx, evalCtx); err != nil { + h.Filter = &execinfrapb.ExprHelper{} + if err := h.Filter.Init(post.Filter, coreOutputTypes, semaCtx, evalCtx); err != nil { return err } } @@ -144,10 +166,10 @@ func (h *ProcOutputHelper) Init( h.OutputTypes[i] = coreOutputTypes[c] } } else if nRenders := len(post.RenderExprs); nRenders > 0 { - if cap(h.renderExprs) >= nRenders { - h.renderExprs = h.renderExprs[:nRenders] + if cap(h.RenderExprs) >= nRenders { + h.RenderExprs = h.RenderExprs[:nRenders] } else { - h.renderExprs = make([]execinfrapb.ExprHelper, nRenders) + h.RenderExprs = make([]execinfrapb.ExprHelper, nRenders) } if cap(h.OutputTypes) >= nRenders { h.OutputTypes = h.OutputTypes[:nRenders] @@ -155,11 +177,11 @@ func (h *ProcOutputHelper) Init( h.OutputTypes = make([]*types.T, nRenders) } for i, expr := range post.RenderExprs { - h.renderExprs[i] = execinfrapb.ExprHelper{} - if err := h.renderExprs[i].Init(expr, coreOutputTypes, semaCtx, evalCtx); err != nil { + h.RenderExprs[i] = execinfrapb.ExprHelper{} + if err := h.RenderExprs[i].Init(expr, coreOutputTypes, semaCtx, evalCtx); err != nil { return err } - h.OutputTypes[i] = h.renderExprs[i].Expr.ResolvedType() + h.OutputTypes[i] = h.RenderExprs[i].Expr.ResolvedType() } } else { // No rendering or projection. @@ -170,7 +192,7 @@ func (h *ProcOutputHelper) Init( } copy(h.OutputTypes, coreOutputTypes) } - if h.outputCols != nil || len(h.renderExprs) > 0 { + if h.outputCols != nil || len(h.RenderExprs) > 0 { // We're rendering or projecting, so allocate an output row. h.outputRow = h.RowAlloc.AllocRow(len(h.OutputTypes)) } @@ -188,7 +210,7 @@ func (h *ProcOutputHelper) Init( // NeededColumns calculates the set of internal processor columns that are // actually used by the post-processing stage. func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) { - if h.outputCols == nil && len(h.renderExprs) == 0 { + if h.outputCols == nil && len(h.RenderExprs) == 0 { // No projection or rendering; all columns are needed. colIdxs.AddRange(0, h.numInternalCols-1) return colIdxs @@ -201,14 +223,14 @@ func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) { for i := 0; i < h.numInternalCols; i++ { // See if filter requires this column. - if h.filter != nil && h.filter.Vars.IndexedVarUsed(i) { + if h.Filter != nil && h.Filter.Vars.IndexedVarUsed(i) { colIdxs.Add(i) continue } // See if render expressions require this column. - for j := range h.renderExprs { - if h.renderExprs[j].Vars.IndexedVarUsed(i) { + for j := range h.RenderExprs { + if h.RenderExprs[j].Vars.IndexedVarUsed(i) { colIdxs.Add(i) break } @@ -282,15 +304,15 @@ func (h *ProcOutputHelper) ProcessRow( return nil, false, nil } - if h.filter != nil { + if h.Filter != nil { // Filtering. - passes, err := h.filter.EvalFilter(row) + passes, err := h.Filter.EvalFilter(row) if err != nil { return nil, false, err } if !passes { if log.V(4) { - log.Infof(ctx, "filtered out row %s", row.String(h.filter.Types)) + log.Infof(ctx, "filtered out row %s", row.String(h.Filter.Types)) } return nil, true, nil } @@ -301,10 +323,10 @@ func (h *ProcOutputHelper) ProcessRow( return nil, true, nil } - if len(h.renderExprs) > 0 { + if len(h.RenderExprs) > 0 { // Rendering. - for i := range h.renderExprs { - datum, err := h.renderExprs[i].Eval(row) + for i := range h.RenderExprs { + datum, err := h.RenderExprs[i].Eval(row) if err != nil { return nil, false, err } From 0220f65694f08e15dd7c7157fa8818f75196437c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 2 Nov 2020 17:17:10 -1000 Subject: [PATCH 4/4] colexec: update the allocator with the internal memory usage This commit removes `InternalMemoryOperator` interface that was put in-place for the operators to register their static memory usage. This was introduced before we had the allocator object (I think), and now we can remove that interface and simply update the allocators with the corresponding internal memory usage. There were only two operators that actually implemented the interface, and both of them were converted to the new pattern. This commit also removes the separation of streaming memory accounts in the vectorized flow setup and merges them with all other accounts. Release note: None --- pkg/sql/colexec/case.go | 9 +- pkg/sql/colexec/colbuilder/execplan.go | 213 +++++++----------- pkg/sql/colexec/mergejoiner.go | 7 +- pkg/sql/colexec/mergejoiner_exceptall.eg.go | 3 +- pkg/sql/colexec/mergejoiner_fullouter.eg.go | 3 +- pkg/sql/colexec/mergejoiner_inner.eg.go | 3 +- .../colexec/mergejoiner_intersectall.eg.go | 3 +- pkg/sql/colexec/mergejoiner_leftanti.eg.go | 3 +- pkg/sql/colexec/mergejoiner_leftouter.eg.go | 3 +- pkg/sql/colexec/mergejoiner_leftsemi.eg.go | 3 +- pkg/sql/colexec/mergejoiner_rightanti.eg.go | 3 +- pkg/sql/colexec/mergejoiner_rightouter.eg.go | 3 +- pkg/sql/colexec/mergejoiner_rightsemi.eg.go | 3 +- pkg/sql/colexec/mergejoiner_tmpl.go | 3 +- pkg/sql/colexec/op_creation.go | 9 +- pkg/sql/colexec/operator.go | 14 -- pkg/sql/colflow/vectorized_flow.go | 21 +- pkg/sql/colflow/vectorized_flow_space_test.go | 24 +- pkg/sql/colflow/vectorized_flow_test.go | 6 +- pkg/sql/colmem/allocator.go | 3 + 20 files changed, 131 insertions(+), 208 deletions(-) diff --git a/pkg/sql/colexec/case.go b/pkg/sql/colexec/case.go index 6c7589c7b83b..1e2f55777ef2 100644 --- a/pkg/sql/colexec/case.go +++ b/pkg/sql/colexec/case.go @@ -46,7 +46,7 @@ type caseOp struct { prevSel []int } -var _ InternalMemoryOperator = &caseOp{} +var _ colexecbase.Operator = &caseOp{} func (c *caseOp) ChildCount(verbose bool) int { return 1 + len(c.caseOps) + 1 @@ -65,11 +65,6 @@ func (c *caseOp) Child(nth int, verbose bool) execinfra.OpNode { return nil } -func (c *caseOp) InternalMemoryUsage() int { - // We internally use two selection vectors, origSel and prevSel. - return 2 * colmem.SizeOfBatchSizeSelVector -} - // NewCaseOp returns an operator that runs a case statement. // buffer is a bufferOp that will return the input batch repeatedly. // caseOps is a list of operator chains, one per branch in the case statement. @@ -89,6 +84,8 @@ func NewCaseOp( outputIdx int, typ *types.T, ) colexecbase.Operator { + // We internally use two selection vectors, origSel and prevSel. + allocator.AdjustMemoryUsage(int64(2 * colmem.SizeOfBatchSizeSelVector)) return &caseOp{ allocator: allocator, buffer: buffer.(*bufferOp), diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 332bc770a37f..a44649a4a1d0 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -1141,10 +1141,6 @@ func NewColOperator( return r, err } - // After constructing the base operator, calculate its internal memory usage. - if sMem, ok := result.Op.(colexec.InternalMemoryOperator); ok { - result.InternalMemUsage += sMem.InternalMemoryUsage() - } if log.V(1) { log.Infof(ctx, "made op %T\n", result.Op) } @@ -1244,11 +1240,6 @@ func (r opResult) planAndMaybeWrapOnExprAsFilter( onExpr execinfrapb.Expression, factory coldata.ColumnFactory, ) error { - // We will plan other Operators on top of r.Op, so we need to account for the - // internal memory explicitly. - if internalMemOp, ok := r.Op.(colexec.InternalMemoryOperator); ok { - r.InternalMemUsage += internalMemOp.InternalMemoryUsage() - } ppr := postProcessResult{ Op: r.Op, ColumnTypes: r.ColumnTypes, @@ -1330,7 +1321,6 @@ func (r *postProcessResult) planPostProcessSpec( semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) var renderedCols []uint32 for renderIdx, renderExpr := range post.RenderExprs { - var renderInternalMem int var expr tree.TypedExpr var err error if procOutputHelper != nil { @@ -1342,7 +1332,7 @@ func (r *postProcessResult) planPostProcessSpec( } } var outputIdx int - r.Op, outputIdx, r.ColumnTypes, renderInternalMem, err = planProjectionOperators( + r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators( ctx, evalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, ) if err != nil { @@ -1351,7 +1341,6 @@ func (r *postProcessResult) planPostProcessSpec( if outputIdx < 0 { return errors.AssertionFailedf("missing outputIdx") } - r.InternalMemUsage += renderInternalMem renderedCols = append(renderedCols, uint32(outputIdx)) } r.Op = colexec.NewSimpleProjectOp(r.Op, len(r.ColumnTypes), renderedCols) @@ -1454,16 +1443,14 @@ func (r opResult) createDiskAccount( } type postProcessResult struct { - Op colexecbase.Operator - ColumnTypes []*types.T - InternalMemUsage int + Op colexecbase.Operator + ColumnTypes []*types.T } func (r opResult) updateWithPostProcessResult(ppr postProcessResult) { r.Op = ppr.Op r.ColumnTypes = make([]*types.T, len(ppr.ColumnTypes)) copy(r.ColumnTypes, ppr.ColumnTypes) - r.InternalMemUsage += ppr.InternalMemUsage } // planFilterExpr creates all operators to implement filter expression. It takes @@ -1481,9 +1468,8 @@ func (r *postProcessResult) planFilterExpr( procOutputHelper *execinfra.ProcOutputHelper, ) error { var ( - selectionInternalMem int - expr tree.TypedExpr - err error + expr tree.TypedExpr + err error ) if procOutputHelper != nil { expr = procOutputHelper.Filter.Expr @@ -1501,13 +1487,12 @@ func (r *postProcessResult) planFilterExpr( return nil } var filterColumnTypes []*types.T - r.Op, _, filterColumnTypes, selectionInternalMem, err = planSelectionOperators( + r.Op, _, filterColumnTypes, err = planSelectionOperators( ctx, evalCtx, expr, r.ColumnTypes, r.Op, acc, factory, ) if err != nil { return errors.Wrapf(err, "unable to columnarize filter expression %q", filter) } - r.InternalMemUsage += selectionInternalMem if len(filterColumnTypes) > len(r.ColumnTypes) { // Additional columns were appended to store projections while evaluating // the filter. Project them away. @@ -1540,28 +1525,27 @@ func planSelectionOperators( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { switch t := expr.(type) { case *tree.IndexedVar: op, err = colexec.BoolOrUnknownToSelOp(input, columnTypes, t.Idx) - return op, -1, columnTypes, internalMemUsed, err + return op, -1, columnTypes, err case *tree.AndExpr: // AND expressions are handled by an implicit AND'ing of selection vectors. // First we select out the tuples that are true on the left side, and then, // only among the matched tuples, we select out the tuples that are true on // the right side. var leftOp, rightOp colexecbase.Operator - var internalMemUsedLeft, internalMemUsedRight int - leftOp, _, typs, internalMemUsedLeft, err = planSelectionOperators( + leftOp, _, typs, err = planSelectionOperators( ctx, evalCtx, t.TypedLeft(), columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - rightOp, resultIdx, typs, internalMemUsedRight, err = planSelectionOperators( + rightOp, resultIdx, typs, err = planSelectionOperators( ctx, evalCtx, t.TypedRight(), typs, leftOp, acc, factory, ) - return rightOp, resultIdx, typs, internalMemUsedLeft + internalMemUsedRight, err + return rightOp, resultIdx, typs, err case *tree.OrExpr: // OR expressions are handled by converting them to an equivalent CASE // statement. Since CASE statements don't have a selection form, plan a @@ -1580,54 +1564,54 @@ func planSelectionOperators( tree.DBoolFalse, types.Bool) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, caseExpr, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } op, err = colexec.BoolOrUnknownToSelOp(op, typs, resultIdx) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.CaseExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op, err = colexec.BoolOrUnknownToSelOp(op, typs, resultIdx) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.IsNullExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, t.TypedInnerExpr(), columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op = colexec.NewIsNullSelOp( op, resultIdx, false /* negate */, typs[resultIdx].Family() == types.TupleFamily, ) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.IsNotNullExpr: - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, t.TypedInnerExpr(), columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } op = colexec.NewIsNullSelOp( op, resultIdx, true /* negate */, typs[resultIdx].Family() == types.TupleFamily, ) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.ComparisonExpr: cmpOp := t.Operator - leftOp, leftIdx, ct, internalMemUsedLeft, err := planProjectionOperators( + leftOp, leftIdx, ct, err := planProjectionOperators( ctx, evalCtx, t.TypedLeft(), columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, ct, internalMemUsed, err + return nil, resultIdx, ct, err } lTyp := ct[leftIdx] if constArg, ok := t.Right.(tree.Datum); ok { @@ -1669,20 +1653,20 @@ func planSelectionOperators( cmpOp, leftOp, ct, leftIdx, constArg, evalCtx, t, ) } - return op, resultIdx, ct, internalMemUsedLeft, err + return op, resultIdx, ct, err } - rightOp, rightIdx, ct, internalMemUsedRight, err := planProjectionOperators( + rightOp, rightIdx, ct, err := planProjectionOperators( ctx, evalCtx, t.TypedRight(), ct, leftOp, acc, factory, ) if err != nil { - return nil, resultIdx, ct, internalMemUsed, err + return nil, resultIdx, ct, err } op, err = colexec.GetSelectionOperator( cmpOp, rightOp, ct, leftIdx, rightIdx, evalCtx, t, ) - return op, resultIdx, ct, internalMemUsedLeft + internalMemUsedRight, err + return op, resultIdx, ct, err default: - return nil, resultIdx, nil, internalMemUsed, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t)) + return nil, resultIdx, nil, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t)) } } @@ -1717,7 +1701,7 @@ func planProjectionOperators( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { // projectDatum is a helper function that adds a new constant projection // operator for the given datum. typs are updated accordingly. projectDatum := func(datum tree.Datum) (colexecbase.Operator, error) { @@ -1735,7 +1719,7 @@ func planProjectionOperators( resultIdx = -1 switch t := expr.(type) { case *tree.IndexedVar: - return input, t.Idx, columnTypes, internalMemUsed, nil + return input, t.Idx, columnTypes, nil case *tree.ComparisonExpr: return planProjectionExpr( ctx, evalCtx, t.Operator, t.ResolvedType(), t.TypedLeft(), t.TypedRight(), @@ -1743,7 +1727,7 @@ func planProjectionOperators( ) case *tree.BinaryExpr: if err = checkSupportedBinaryExpr(t.TypedLeft(), t.TypedRight(), t.ResolvedType()); err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } return planProjectionExpr( ctx, evalCtx, t.Operator, t.ResolvedType(), t.TypedLeft(), t.TypedRight(), @@ -1755,19 +1739,16 @@ func planProjectionOperators( return planIsNullProjectionOp(ctx, evalCtx, t.ResolvedType(), t.TypedInnerExpr(), columnTypes, input, acc, true /* negate */, factory) case *tree.CastExpr: expr := t.Expr.(tree.TypedExpr) - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return nil, 0, nil, internalMemUsed, err + return nil, 0, nil, err } op, resultIdx, typs, err = planCastOperator(ctx, acc, typs, op, resultIdx, expr.ResolvedType(), t.ResolvedType(), factory) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.FuncExpr: - var ( - inputCols []int - projectionInternalMem int - ) + var inputCols []int typs = make([]*types.T, len(columnTypes)) copy(typs, columnTypes) op = input @@ -1776,24 +1757,23 @@ func planProjectionOperators( // TODO(rohany): This could be done better, especially in the case of // constant arguments, because the vectorized engine right now // creates a new column full of the constant value. - op, resultIdx, typs, projectionInternalMem, err = planProjectionOperators( + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, e.(tree.TypedExpr), typs, op, acc, factory, ) if err != nil { - return nil, resultIdx, nil, internalMemUsed, err + return nil, resultIdx, nil, err } inputCols = append(inputCols, resultIdx) - internalMemUsed += projectionInternalMem } resultIdx = len(typs) op, err = colexec.NewBuiltinFunctionOperator( colmem.NewAllocator(ctx, acc, factory), evalCtx, t, typs, inputCols, resultIdx, op, ) typs = appendOneType(typs, t.ResolvedType()) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case tree.Datum: op, err = projectDatum(t) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.Tuple: isConstTuple := true for _, expr := range t.Exprs { @@ -1807,23 +1787,21 @@ func planProjectionOperators( // project the resulting datum. tuple, err := t.Eval(evalCtx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } op, err = projectDatum(tuple) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } outputType := t.ResolvedType() typs = make([]*types.T, len(columnTypes)) copy(typs, columnTypes) tupleContentsIdxs := make([]int, len(t.Exprs)) for i, expr := range t.Exprs { - var memUsed int - input, tupleContentsIdxs[i], typs, memUsed, err = planProjectionOperators( + input, tupleContentsIdxs[i], typs, err = planProjectionOperators( ctx, evalCtx, expr.(tree.TypedExpr), typs, input, acc, factory, ) - internalMemUsed += memUsed if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } resultIdx = len(typs) @@ -1831,10 +1809,10 @@ func planProjectionOperators( colmem.NewAllocator(ctx, acc, factory), typs, tupleContentsIdxs, outputType, input, resultIdx, ) typs = appendOneType(typs, outputType) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err case *tree.CaseExpr: if t.Expr != nil { - return nil, resultIdx, typs, internalMemUsed, errors.New("CASE WHEN expressions unsupported") + return nil, resultIdx, typs, errors.New("CASE WHEN expressions unsupported") } allocator := colmem.NewAllocator(ctx, acc, factory) @@ -1844,7 +1822,7 @@ func planProjectionOperators( // works (which populates its output in arbitrary order) and the flat // bytes implementation of Bytes type (which prohibits sets in arbitrary // order), so we reject such scenario to fall back to row-by-row engine. - return nil, resultIdx, typs, internalMemUsed, errors.Newf( + return nil, resultIdx, typs, errors.Newf( "unsupported type %s in CASE operator", caseOutputType) } caseOutputIdx := len(columnTypes) @@ -1872,26 +1850,24 @@ func planProjectionOperators( // results of the WHEN into a single output vector, assembling the final // result of the case projection. whenTyped := when.Cond.(tree.TypedExpr) - var whenInternalMemUsed, thenInternalMemUsed int - caseOps[i], resultIdx, typs, whenInternalMemUsed, err = planProjectionOperators( + caseOps[i], resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, whenTyped, typs, buffer, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } caseOps[i], err = colexec.BoolOrUnknownToSelOp(caseOps[i], typs, resultIdx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } // Run the "then" clause on those tuples that were selected. - caseOps[i], thenIdxs[i], typs, thenInternalMemUsed, err = planProjectionOperators( + caseOps[i], thenIdxs[i], typs, err = planProjectionOperators( ctx, evalCtx, when.Val.(tree.TypedExpr), typs, caseOps[i], acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += whenInternalMemUsed + thenInternalMemUsed if !typs[thenIdxs[i]].Identical(typs[caseOutputIdx]) { // It is possible that the projection of this THEN arm has different // column type (for example, we expect INT2, but INT8 is given). In @@ -1901,24 +1877,22 @@ func planProjectionOperators( ctx, acc, typs, caseOps[i], thenIdxs[i], fromType, toType, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } } - var elseInternalMemUsed int var elseOp colexecbase.Operator elseExpr := t.Else if elseExpr == nil { // If there's no ELSE arm, we write NULLs. elseExpr = tree.DNull } - elseOp, thenIdxs[len(t.Whens)], typs, elseInternalMemUsed, err = planProjectionOperators( + elseOp, thenIdxs[len(t.Whens)], typs, err = planProjectionOperators( ctx, evalCtx, elseExpr.(tree.TypedExpr), typs, buffer, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += elseInternalMemUsed if !typs[thenIdxs[len(t.Whens)]].Identical(typs[caseOutputIdx]) { // It is possible that the projection of the ELSE arm has different // column type (for example, we expect INT2, but INT8 is given). In @@ -1929,18 +1903,17 @@ func planProjectionOperators( ctx, acc, typs, elseOp, elseIdx, fromType, toType, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } } schemaEnforcer.SetTypes(typs) op := colexec.NewCaseOp(allocator, buffer, caseOps, elseOp, thenIdxs, caseOutputIdx, caseOutputType) - internalMemUsed += op.(colexec.InternalMemoryOperator).InternalMemoryUsage() - return op, caseOutputIdx, typs, internalMemUsed, err + return op, caseOutputIdx, typs, err case *tree.AndExpr, *tree.OrExpr: return planLogicalProjectionOp(ctx, evalCtx, expr, columnTypes, input, acc, factory) default: - return nil, resultIdx, nil, internalMemUsed, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t)) + return nil, resultIdx, nil, errors.Errorf("unhandled projection expression type: %s", reflect.TypeOf(t)) } } @@ -1984,9 +1957,9 @@ func planProjectionExpr( factory coldata.ColumnFactory, binFn tree.TwoArgFn, cmpExpr *tree.ComparisonExpr, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { if err := checkSupportedProjectionExpr(left, right); err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } allocator := colmem.NewAllocator(ctx, acc, factory) resultIdx = -1 @@ -1998,11 +1971,11 @@ func planProjectionExpr( // argument is on the right side. This doesn't happen for non-commutative // operators such as - and /, though, so we still need this case. var rightIdx int - input, rightIdx, typs, internalMemUsed, err = planProjectionOperators( + input, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, right, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } resultIdx = len(typs) // The projection result will be outputted to a new column which is appended @@ -2012,17 +1985,13 @@ func planProjectionExpr( rightIdx, lConstArg, resultIdx, evalCtx, binFn, cmpExpr, ) } else { - var ( - leftIdx int - internalMemUsedLeft int - ) - input, leftIdx, typs, internalMemUsedLeft, err = planProjectionOperators( + var leftIdx int + input, leftIdx, typs, err = planProjectionOperators( ctx, evalCtx, left, columnTypes, input, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - internalMemUsed += internalMemUsedLeft // Note that this check exists only for testing purposes. On the // regular workloads, we expect that tree.Tuples have already been // pre-evaluated. We also don't need fully-fledged planning as we have @@ -2031,7 +2000,7 @@ func planProjectionExpr( if tuple, ok := right.(*tree.Tuple); ok { tupleDatum, err := tuple.Eval(evalCtx) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } right = tupleDatum } @@ -2084,17 +2053,13 @@ func planProjectionExpr( } } else { // Case 3: neither are constant. - var ( - rightIdx int - internalMemUsedRight int - ) - input, rightIdx, typs, internalMemUsedRight, err = planProjectionOperators( + var rightIdx int + input, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, right, typs, input, acc, factory, ) if err != nil { - return nil, resultIdx, nil, internalMemUsed, err + return nil, resultIdx, nil, err } - internalMemUsed += internalMemUsedRight resultIdx = len(typs) op, err = colexec.GetProjectionOperator( allocator, typs, outputType, projOp, input, leftIdx, rightIdx, @@ -2103,13 +2068,10 @@ func planProjectionExpr( } } if err != nil { - return op, resultIdx, typs, internalMemUsed, err - } - if sMem, ok := op.(colexec.InternalMemoryOperator); ok { - internalMemUsed += sMem.InternalMemoryUsage() + return op, resultIdx, typs, err } typs = appendOneType(typs, outputType) - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } // planLogicalProjectionOp plans all the needed operators for a projection of @@ -2122,15 +2084,14 @@ func planLogicalProjectionOp( input colexecbase.Operator, acc *mon.BoundAccount, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { // Add a new boolean column that will store the result of the projection. resultIdx = len(columnTypes) typs = appendOneType(columnTypes, types.Bool) var ( - typedLeft, typedRight tree.TypedExpr - leftProjOpChain, rightProjOpChain colexecbase.Operator - leftIdx, rightIdx int - internalMemUsedLeft, internalMemUsedRight int + typedLeft, typedRight tree.TypedExpr + leftProjOpChain, rightProjOpChain colexecbase.Operator + leftIdx, rightIdx int ) leftFeedOp := colexec.NewFeedOperator() rightFeedOp := colexec.NewFeedOperator() @@ -2144,17 +2105,17 @@ func planLogicalProjectionOp( default: colexecerror.InternalError(errors.AssertionFailedf("unexpected logical expression type %s", t.String())) } - leftProjOpChain, leftIdx, typs, internalMemUsedLeft, err = planProjectionOperators( + leftProjOpChain, leftIdx, typs, err = planProjectionOperators( ctx, evalCtx, typedLeft, typs, leftFeedOp, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } - rightProjOpChain, rightIdx, typs, internalMemUsedRight, err = planProjectionOperators( + rightProjOpChain, rightIdx, typs, err = planProjectionOperators( ctx, evalCtx, typedRight, typs, rightFeedOp, acc, factory, ) if err != nil { - return nil, resultIdx, typs, internalMemUsed, err + return nil, resultIdx, typs, err } allocator := colmem.NewAllocator(ctx, acc, factory) input = colexec.NewBatchSchemaSubsetEnforcer(allocator, input, typs, resultIdx, len(typs)) @@ -2176,7 +2137,7 @@ func planLogicalProjectionOp( leftIdx, rightIdx, resultIdx, ) } - return op, resultIdx, typs, internalMemUsedLeft + internalMemUsedRight, err + return op, resultIdx, typs, err } // planIsNullProjectionOp plans the operator for IS NULL and IS NOT NULL @@ -2191,12 +2152,12 @@ func planIsNullProjectionOp( acc *mon.BoundAccount, negate bool, factory coldata.ColumnFactory, -) (op colexecbase.Operator, resultIdx int, typs []*types.T, internalMemUsed int, err error) { - op, resultIdx, typs, internalMemUsed, err = planProjectionOperators( +) (op colexecbase.Operator, resultIdx int, typs []*types.T, err error) { + op, resultIdx, typs, err = planProjectionOperators( ctx, evalCtx, expr, columnTypes, input, acc, factory, ) if err != nil { - return op, resultIdx, typs, internalMemUsed, err + return op, resultIdx, typs, err } outputIdx := len(typs) isTupleNull := typs[resultIdx].Family() == types.TupleFamily @@ -2204,7 +2165,7 @@ func planIsNullProjectionOp( colmem.NewAllocator(ctx, acc, factory), op, resultIdx, outputIdx, negate, isTupleNull, ) typs = appendOneType(typs, outputType) - return op, outputIdx, typs, internalMemUsed, nil + return op, outputIdx, typs, nil } // appendOneType appends a *types.T to then end of a []*types.T. The size of the diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index bf135d492581..adf96210f6a5 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -530,11 +530,6 @@ func (o *mergeJoinBase) reset(ctx context.Context) { o.resetBuilderCrossProductState() } -func (o *mergeJoinBase) InternalMemoryUsage() int { - const sizeOfGroup = int(unsafe.Sizeof(group{})) - return 8 * coldata.BatchSize() * sizeOfGroup // o.groups -} - func (o *mergeJoinBase) Init() { if o.joinType.ShouldIncludeLeftColsInOutput() { o.outputTypes = append(o.outputTypes, o.left.sourceTypes...) @@ -562,6 +557,8 @@ func (o *mergeJoinBase) Init() { o.builderState.lGroups = make([]group, 1) o.builderState.rGroups = make([]group, 1) + const sizeOfGroup = int(unsafe.Sizeof(group{})) + o.unlimitedAllocator.AdjustMemoryUsage(int64(8 * coldata.BatchSize() * sizeOfGroup)) o.groups = makeGroupsBuffer(coldata.BatchSize()) o.resetBuilderCrossProductState() } diff --git a/pkg/sql/colexec/mergejoiner_exceptall.eg.go b/pkg/sql/colexec/mergejoiner_exceptall.eg.go index 6c370ce8fe09..cd01c9d6570a 100644 --- a/pkg/sql/colexec/mergejoiner_exceptall.eg.go +++ b/pkg/sql/colexec/mergejoiner_exceptall.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinExceptAllOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinExceptAllOp{} +var _ colexecbase.Operator = &mergeJoinExceptAllOp{} func (o *mergeJoinExceptAllOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_fullouter.eg.go b/pkg/sql/colexec/mergejoiner_fullouter.eg.go index a49a2f0a7829..e2b33bf29355 100644 --- a/pkg/sql/colexec/mergejoiner_fullouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_fullouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinFullOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinFullOuterOp{} +var _ colexecbase.Operator = &mergeJoinFullOuterOp{} func (o *mergeJoinFullOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_inner.eg.go b/pkg/sql/colexec/mergejoiner_inner.eg.go index 27ec6de29547..c53a6131152a 100644 --- a/pkg/sql/colexec/mergejoiner_inner.eg.go +++ b/pkg/sql/colexec/mergejoiner_inner.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinInnerOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinInnerOp{} +var _ colexecbase.Operator = &mergeJoinInnerOp{} func (o *mergeJoinInnerOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_intersectall.eg.go b/pkg/sql/colexec/mergejoiner_intersectall.eg.go index 728fc8f09cfb..395553a4c46d 100644 --- a/pkg/sql/colexec/mergejoiner_intersectall.eg.go +++ b/pkg/sql/colexec/mergejoiner_intersectall.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinIntersectAllOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinIntersectAllOp{} +var _ colexecbase.Operator = &mergeJoinIntersectAllOp{} func (o *mergeJoinIntersectAllOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftanti.eg.go b/pkg/sql/colexec/mergejoiner_leftanti.eg.go index 148460a7c20d..8e7159c60e03 100644 --- a/pkg/sql/colexec/mergejoiner_leftanti.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftanti.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftAntiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftAntiOp{} +var _ colexecbase.Operator = &mergeJoinLeftAntiOp{} func (o *mergeJoinLeftAntiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftouter.eg.go b/pkg/sql/colexec/mergejoiner_leftouter.eg.go index f44f899404c4..d955a510b0a4 100644 --- a/pkg/sql/colexec/mergejoiner_leftouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftOuterOp{} +var _ colexecbase.Operator = &mergeJoinLeftOuterOp{} func (o *mergeJoinLeftOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_leftsemi.eg.go b/pkg/sql/colexec/mergejoiner_leftsemi.eg.go index aef8352cb484..21538f1cb89d 100644 --- a/pkg/sql/colexec/mergejoiner_leftsemi.eg.go +++ b/pkg/sql/colexec/mergejoiner_leftsemi.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinLeftSemiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinLeftSemiOp{} +var _ colexecbase.Operator = &mergeJoinLeftSemiOp{} func (o *mergeJoinLeftSemiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightanti.eg.go b/pkg/sql/colexec/mergejoiner_rightanti.eg.go index 669fd37ac4ae..c7308fec5015 100644 --- a/pkg/sql/colexec/mergejoiner_rightanti.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightanti.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightAntiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightAntiOp{} +var _ colexecbase.Operator = &mergeJoinRightAntiOp{} func (o *mergeJoinRightAntiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightouter.eg.go b/pkg/sql/colexec/mergejoiner_rightouter.eg.go index 2216b620803d..da0cdcdedb5d 100644 --- a/pkg/sql/colexec/mergejoiner_rightouter.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightouter.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightOuterOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightOuterOp{} +var _ colexecbase.Operator = &mergeJoinRightOuterOp{} func (o *mergeJoinRightOuterOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_rightsemi.eg.go b/pkg/sql/colexec/mergejoiner_rightsemi.eg.go index c9e2cddef4f8..f1f7cde68389 100644 --- a/pkg/sql/colexec/mergejoiner_rightsemi.eg.go +++ b/pkg/sql/colexec/mergejoiner_rightsemi.eg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,7 +42,7 @@ type mergeJoinRightSemiOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoinRightSemiOp{} +var _ colexecbase.Operator = &mergeJoinRightSemiOp{} func (o *mergeJoinRightSemiOp) probeBodyLSeltrueRSeltrue(ctx context.Context) { lSel := o.proberState.lBatch.Selection() diff --git a/pkg/sql/colexec/mergejoiner_tmpl.go b/pkg/sql/colexec/mergejoiner_tmpl.go index 503e3519d641..76359d7a51b0 100644 --- a/pkg/sql/colexec/mergejoiner_tmpl.go +++ b/pkg/sql/colexec/mergejoiner_tmpl.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execgen" + "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -97,7 +98,7 @@ type mergeJoin_JOIN_TYPE_STRINGOp struct { *mergeJoinBase } -var _ InternalMemoryOperator = &mergeJoin_JOIN_TYPE_STRINGOp{} +var _ colexecbase.Operator = &mergeJoin_JOIN_TYPE_STRINGOp{} // {{/* // This code snippet is the "meat" of the probing phase. diff --git a/pkg/sql/colexec/op_creation.go b/pkg/sql/colexec/op_creation.go index d4f969f1b4b1..2510318f81a7 100644 --- a/pkg/sql/colexec/op_creation.go +++ b/pkg/sql/colexec/op_creation.go @@ -77,11 +77,10 @@ type NewColOperatorArgs struct { // NewColOperatorResult is a helper struct that encompasses all of the return // values of NewColOperator call. type NewColOperatorResult struct { - Op colexecbase.Operator - IOReader execinfra.IOReader - ColumnTypes []*types.T - InternalMemUsage int - MetadataSources []execinfrapb.MetadataSource + Op colexecbase.Operator + IOReader execinfra.IOReader + ColumnTypes []*types.T + MetadataSources []execinfrapb.MetadataSource // ToClose is a slice of components that need to be Closed. ToClose []colexecbase.Closer OpMonitors []*mon.BytesMonitor diff --git a/pkg/sql/colexec/operator.go b/pkg/sql/colexec/operator.go index 5a4c477311c9..701b2dab5c07 100644 --- a/pkg/sql/colexec/operator.go +++ b/pkg/sql/colexec/operator.go @@ -97,20 +97,6 @@ func (n *twoInputNode) Child(nth int, verbose bool) execinfra.OpNode { return nil } -// TODO(yuzefovich): audit all Operators to make sure that all internal memory -// is accounted for. - -// InternalMemoryOperator is an interface that operators which use internal -// memory need to implement. "Internal memory" is defined as memory that is -// "private" to the operator and is not exposed to the outside; notably, it -// does *not* include any coldata.Batch'es and coldata.Vec's. -type InternalMemoryOperator interface { - colexecbase.Operator - // InternalMemoryUsage reports the internal memory usage (in bytes) of an - // operator. - InternalMemoryUsage() int -} - // resetter is an interface that operators can implement if they can be reset // either for reusing (to keep the already allocated memory) or during tests. type resetter interface { diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index d00a5a6f582e..f870aa7bf78a 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -468,14 +468,11 @@ type vectorizedFlowCreator struct { leaves []execinfra.OpNode // operatorConcurrency is set if any operators are executed in parallel. operatorConcurrency bool - // streamingMemAccounts contains all memory accounts of the non-buffering - // components in the vectorized flow. - streamingMemAccounts []*mon.BoundAccount // monitors contains all monitors (for both memory and disk usage) of the - // buffering components in the vectorized flow. + // components in the vectorized flow. monitors []*mon.BytesMonitor // accounts contains all monitors (for both memory and disk usage) of the - // buffering components in the vectorized flow. + // components in the vectorized flow. accounts []*mon.BoundAccount // releasables contains all components that should be released back to their // pools during the flow cleanup. @@ -527,7 +524,6 @@ func newVectorizedFlowCreator( exprHelper: creator.exprHelper, typeResolver: typeResolver, leaves: creator.leaves, - streamingMemAccounts: creator.streamingMemAccounts, monitors: creator.monitors, accounts: creator.accounts, releasables: creator.releasables, @@ -538,9 +534,6 @@ func newVectorizedFlowCreator( } func (s *vectorizedFlowCreator) cleanup(ctx context.Context) { - for _, acc := range s.streamingMemAccounts { - acc.Close(ctx) - } for _, acc := range s.accounts { acc.Close(ctx) } @@ -562,7 +555,6 @@ func (s *vectorizedFlowCreator) Release() { vectorizedStatsCollectorsQueue: s.vectorizedStatsCollectorsQueue[:0], exprHelper: s.exprHelper, leaves: s.leaves[:0], - streamingMemAccounts: s.streamingMemAccounts[:0], monitors: s.monitors[:0], accounts: s.accounts[:0], releasables: s.releasables[:0], @@ -611,7 +603,7 @@ func (s *vectorizedFlowCreator) newStreamingMemAccount( flowCtx *execinfra.FlowCtx, ) *mon.BoundAccount { streamingMemAccount := flowCtx.EvalCtx.Mon.MakeBoundAccount() - s.streamingMemAccounts = append(s.streamingMemAccounts, &streamingMemAccount) + s.accounts = append(s.accounts, &streamingMemAccount) return &streamingMemAccount } @@ -1099,13 +1091,6 @@ func (s *vectorizedFlowCreator) setupFlow( if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics { result.Op = colexec.NewPanicInjector(result.Op) } - // We created a streaming memory account when calling NewColOperator above, - // so there is definitely at least one memory account, and it doesn't - // matter which one we grow. - if err = s.streamingMemAccounts[0].Grow(ctx, int64(result.InternalMemUsage)); err != nil { - err = errors.Wrapf(err, "not enough memory to setup vectorized plan") - return - } metadataSourcesQueue = append(metadataSourcesQueue, result.MetadataSources...) if flowCtx.Cfg != nil && flowCtx.Cfg.TestingKnobs.CheckVectorizedFlowIsClosedCorrectly { for _, closer := range result.ToClose { diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index bd295b96277c..d8dc48d2fb8e 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -49,10 +49,6 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { oneInput := []execinfrapb.InputSyncSpec{ {ColumnTypes: []*types.T{types.Int}}, } - twoInputs := []execinfrapb.InputSyncSpec{ - {ColumnTypes: []*types.T{types.Int}}, - {ColumnTypes: []*types.T{types.Int}}, - } testCases := []struct { desc string @@ -71,16 +67,6 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { ResultTypes: rowenc.OneIntCol, }, }, - { - desc: "MERGE JOIN", - spec: &execinfrapb.ProcessorSpec{ - Input: twoInputs, - Core: execinfrapb.ProcessorCoreUnion{ - MergeJoiner: &execinfrapb.MergeJoinerSpec{}, - }, - ResultTypes: append(twoInputs[0].ColumnTypes, twoInputs[1].ColumnTypes...), - }, - }, } for _, tc := range testCases { @@ -105,11 +91,13 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { StreamingMemAccount: &acc, } args.TestingKnobs.UseStreamingMemAccountForBuffering = true - result, err := colbuilder.NewColOperator(ctx, flowCtx, args) - if err != nil { - t.Fatal(err) + var setupErr error + err := colexecerror.CatchVectorizedRuntimeError(func() { + _, setupErr = colbuilder.NewColOperator(ctx, flowCtx, args) + }) + if setupErr != nil { + t.Fatal(setupErr) } - err = acc.Grow(ctx, int64(result.InternalMemUsage)) if success { require.NoError(t, err, "expected success, found: ", err) } else { diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 81e985565815..07844c269c9f 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -232,11 +232,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { ) _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally) - defer func() { - for _, memAcc := range vfc.streamingMemAccounts { - memAcc.Close(ctx) - } - }() + defer vfc.cleanup(ctx) require.NoError(t, err) // Verify that an outbox was actually created. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 46c30b893f81..7c2261856d82 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -26,6 +26,9 @@ import ( "github.com/cockroachdb/errors" ) +// TODO(yuzefovich): audit all Operators to make sure that all static +// (internal) memory is accounted for. + // Allocator is a memory management tool for vectorized components. It provides // new batches (and appends to existing ones) within a fixed memory budget. If // the budget is exceeded, it will panic with an error.