diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index 3d57827e72a1..fd8581f8d391 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -63,7 +63,7 @@ type Batch interface { // // NOTE: Reset can allocate a new Batch, so when calling from the vectorized // engine consider either allocating a new Batch explicitly via - // colexec.Allocator or calling ResetInternalBatch. + // colmem.Allocator or calling ResetInternalBatch. Reset(typs []*types.T, length int, factory ColumnFactory) // ResetInternalBatch resets a batch and its underlying Vecs for reuse. It's // important for callers to call ResetInternalBatch if they own internal diff --git a/pkg/col/colserde/main_test.go b/pkg/col/colserde/main_test.go index e501b144ad06..ad355d0493b8 100644 --- a/pkg/col/colserde/main_test.go +++ b/pkg/col/colserde/main_test.go @@ -25,7 +25,7 @@ import ( ) var ( - // testAllocator is a colexec.Allocator with an unlimited budget for use in + // testAllocator is a colmem.Allocator with an unlimited budget for use in // tests. testAllocator *colmem.Allocator testColumnFactory coldata.ColumnFactory diff --git a/pkg/sql/colcontainer/main_test.go b/pkg/sql/colcontainer/main_test.go index b4d00c916154..1763b46fdc57 100644 --- a/pkg/sql/colcontainer/main_test.go +++ b/pkg/sql/colcontainer/main_test.go @@ -25,7 +25,7 @@ import ( ) var ( - // testAllocator is a colexec.Allocator with an unlimited budget for use in + // testAllocator is a colmem.Allocator with an unlimited budget for use in // tests. testAllocator *colmem.Allocator testColumnFactory coldata.ColumnFactory diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 5ce1f9ea2686..0d5d70a8afdd 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -362,12 +362,16 @@ func (r opResult) createDiskBackedSort( // There is a limit specified, so we know exactly how many rows the // sorter should output. Use a top K sorter, which uses a heap to avoid // storing more rows than necessary. + opName := opNamePrefix + "topk-sort" var topKSorterMemAccount *mon.BoundAccount topKSorterMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( - ctx, flowCtx, spoolMemLimit, opNamePrefix+"topk-sort", processorID, + ctx, flowCtx, spoolMemLimit, opName, processorID, + ) + unlimitedMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, ) inMemorySorter = colexec.NewTopKSorter( - colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input, + colmem.NewLimitedAllocator(ctx, topKSorterMemAccount, unlimitedMemAcc, factory), input, inputTypes, ordering.Columns, int(matchLen), uint64(limit), maxOutputBatchMemSize, ) } else if matchLen > 0 { @@ -383,19 +387,27 @@ func (r opResult) createDiskBackedSort( sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( ctx, flowCtx, spoolMemLimit, opName, processorID, ) + unlimitedMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, + ) inMemorySorter = colexec.NewSortChunks( - deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory), + deselectorUnlimitedAllocator, + colmem.NewLimitedAllocator(ctx, sortChunksMemAccount, unlimitedMemAcc, factory), input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize, ) } else { // No optimizations possible. Default to the standard sort operator. var sorterMemAccount *mon.BoundAccount + opName := opNamePrefix + "sort-all" sorterMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( - ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-all", processorID, + ctx, flowCtx, spoolMemLimit, opName, processorID, + ) + unlimitedMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, ) inMemorySorter = colexec.NewSorter( - colmem.NewAllocator(ctx, sorterMemAccount, factory), input, - inputTypes, ordering.Columns, maxOutputBatchMemSize, + colmem.NewLimitedAllocator(ctx, sorterMemAccount, unlimitedMemAcc, factory), + input, inputTypes, ordering.Columns, maxOutputBatchMemSize, ) } if args.TestingKnobs.DiskSpillingDisabled { @@ -930,9 +942,15 @@ func NewColOperator( spillingQueueMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount( ctx, flowCtx, spillingQueueMemMonitorName, spec.ProcessorID, ) - newAggArgs.Allocator = colmem.NewAllocator(ctx, hashAggregatorMemAccount, factory) + hashAggUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, spec.ProcessorID, + ) + newAggArgs.Allocator = colmem.NewLimitedAllocator(ctx, hashAggregatorMemAccount, hashAggUnlimitedAcc, factory) newAggArgs.MemAccount = hashAggregatorMemAccount - hashTableAllocator := colmem.NewAllocator(ctx, hashTableMemAccount, factory) + hashTableUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, spec.ProcessorID, + ) + hashTableAllocator := colmem.NewLimitedAllocator(ctx, hashTableMemAccount, hashTableUnlimitedAcc, factory) inMemoryHashAggregator := colexec.NewHashAggregator( newAggArgs, &colexecutils.NewSpillingQueueArgs{ @@ -1024,7 +1042,10 @@ func NewColOperator( // ordered distinct, and we should plan it when we have // non-empty ordered columns and we think that the probability // of distinct tuples in the input is about 0.01 or less. - allocator := colmem.NewAllocator(ctx, distinctMemAccount, factory) + unlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, "distinct" /* opName */, spec.ProcessorID, + ) + allocator := colmem.NewLimitedAllocator(ctx, distinctMemAccount, unlimitedAcc, factory) inMemoryUnorderedDistinct := colexec.NewUnorderedDistinct( allocator, inputs[0].Root, core.Distinct.DistinctColumns, result.ColumnTypes, core.Distinct.NullsAreDistinct, core.Distinct.ErrorOnDup, @@ -1111,8 +1132,11 @@ func NewColOperator( core.HashJoiner.RightEqColumnsAreKey, ) + hashJoinerUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, spec.ProcessorID, + ) inMemoryHashJoiner := colexecjoin.NewHashJoiner( - colmem.NewAllocator(ctx, hashJoinerMemAccount, factory), + colmem.NewLimitedAllocator(ctx, hashJoinerMemAccount, hashJoinerUnlimitedAcc, factory), hashJoinerUnlimitedAllocator, hjSpec, inputs[0].Root, inputs[1].Root, colexecjoin.HashJoinerInitialNumBuckets, ) diff --git a/pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go b/pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go index b5590c446d4a..974b141a2f49 100644 --- a/pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go @@ -162,7 +162,7 @@ func (a *anyNotNull_TYPE_AGGKINDAgg) Compute( ) execgen.SETVARIABLESIZE(newCurAggSize, a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/avg_agg_tmpl.go b/pkg/sql/colexec/colexecagg/avg_agg_tmpl.go index 501b08a9ec09..32b89566524c 100644 --- a/pkg/sql/colexec/colexecagg/avg_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/avg_agg_tmpl.go @@ -181,7 +181,7 @@ func (a *avg_TYPE_AGGKINDAgg) Compute( // {{end}} execgen.SETVARIABLESIZE(newCurSumSize, a.curSum) if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } diff --git a/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go b/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go index d091be32bdc6..b834ef740057 100644 --- a/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go @@ -142,7 +142,7 @@ func (a *bool_OP_TYPE_AGGKINDAgg) Compute( // {{end}} execgen.SETVARIABLESIZE(newCurAggSize, a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/concat_agg_tmpl.go b/pkg/sql/colexec/colexecagg/concat_agg_tmpl.go index c6dcad35b6a4..f92d7ddd44b2 100644 --- a/pkg/sql/colexec/colexecagg/concat_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/concat_agg_tmpl.go @@ -119,7 +119,7 @@ func (a *concat_AGGKINDAgg) Compute( // {{end}} execgen.SETVARIABLESIZE(newCurAggSize, a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go index ce8827365a91..8900a7ba285a 100644 --- a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go @@ -196,7 +196,7 @@ func (a *default_AGGKINDAggAlloc) newAggFunc() AggregateFunc { } f.allocator = a.allocator f.scratch.otherArgs = a.otherArgsScratch - a.allocator.AdjustMemoryUsage(f.fn.Size()) + a.allocator.AdjustMemoryUsageAfterAllocation(f.fn.Size()) a.aggFuncs = a.aggFuncs[1:] a.returnedFns = append(a.returnedFns, f) return f diff --git a/pkg/sql/colexec/colexecagg/hash_any_not_null_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_any_not_null_agg.eg.go index c476f0641392..919e8d716c9b 100644 --- a/pkg/sql/colexec/colexecagg/hash_any_not_null_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_any_not_null_agg.eg.go @@ -170,7 +170,7 @@ func (a *anyNotNullBoolHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -278,7 +278,7 @@ func (a *anyNotNullBytesHashAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -390,7 +390,7 @@ func (a *anyNotNullDecimalHashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -498,7 +498,7 @@ func (a *anyNotNullInt16HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -606,7 +606,7 @@ func (a *anyNotNullInt32HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -714,7 +714,7 @@ func (a *anyNotNullInt64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -822,7 +822,7 @@ func (a *anyNotNullFloat64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -930,7 +930,7 @@ func (a *anyNotNullTimestampHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1038,7 +1038,7 @@ func (a *anyNotNullIntervalHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1174,7 +1174,7 @@ func (a *anyNotNullJSONHashAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1296,7 +1296,7 @@ func (a *anyNotNullDatumHashAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_avg_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_avg_agg.eg.go index 7bfbb4343a1f..bd563641289b 100644 --- a/pkg/sql/colexec/colexecagg/hash_avg_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_avg_agg.eg.go @@ -135,7 +135,7 @@ func (a *avgInt16HashAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -249,7 +249,7 @@ func (a *avgInt32HashAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -363,7 +363,7 @@ func (a *avgInt64HashAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -475,7 +475,7 @@ func (a *avgDecimalHashAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -581,7 +581,7 @@ func (a *avgFloat64HashAgg) Compute( ) var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -673,7 +673,7 @@ func (a *avgIntervalHashAgg) Compute( ) var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go index 70a4b1d057f5..f488e0e85113 100644 --- a/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go @@ -76,7 +76,7 @@ func (a *boolAndHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -172,7 +172,7 @@ func (a *boolOrHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_concat_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_concat_agg.eg.go index e292c227e76f..3623f01ecea0 100644 --- a/pkg/sql/colexec/colexecagg/hash_concat_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_concat_agg.eg.go @@ -67,7 +67,7 @@ func (a *concatHashAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go index 5b46de655da8..8f03e9be4cdb 100644 --- a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go @@ -167,7 +167,7 @@ func (a *defaultHashAggAlloc) newAggFunc() AggregateFunc { } f.allocator = a.allocator f.scratch.otherArgs = a.otherArgsScratch - a.allocator.AdjustMemoryUsage(f.fn.Size()) + a.allocator.AdjustMemoryUsageAfterAllocation(f.fn.Size()) a.aggFuncs = a.aggFuncs[1:] a.returnedFns = append(a.returnedFns, f) return f diff --git a/pkg/sql/colexec/colexecagg/hash_min_max_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_min_max_agg.eg.go index 5fa52970bffe..a8b0b2ad2968 100644 --- a/pkg/sql/colexec/colexecagg/hash_min_max_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_min_max_agg.eg.go @@ -206,7 +206,7 @@ func (a *minBoolHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -327,7 +327,7 @@ func (a *minBytesHashAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -457,7 +457,7 @@ func (a *minDecimalHashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -600,7 +600,7 @@ func (a *minInt16HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -743,7 +743,7 @@ func (a *minInt32HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -886,7 +886,7 @@ func (a *minInt64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1045,7 +1045,7 @@ func (a *minFloat64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1180,7 +1180,7 @@ func (a *minTimestampHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1301,7 +1301,7 @@ func (a *minIntervalHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1484,7 +1484,7 @@ func (a *minJSONHashAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1632,7 +1632,7 @@ func (a *minDatumHashAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1855,7 +1855,7 @@ func (a *maxBoolHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1976,7 +1976,7 @@ func (a *maxBytesHashAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2106,7 +2106,7 @@ func (a *maxDecimalHashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2249,7 +2249,7 @@ func (a *maxInt16HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2392,7 +2392,7 @@ func (a *maxInt32HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2535,7 +2535,7 @@ func (a *maxInt64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2694,7 +2694,7 @@ func (a *maxFloat64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2829,7 +2829,7 @@ func (a *maxTimestampHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2950,7 +2950,7 @@ func (a *maxIntervalHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3133,7 +3133,7 @@ func (a *maxJSONHashAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3281,7 +3281,7 @@ func (a *maxDatumHashAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_sum_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_sum_agg.eg.go index 3e355c6e914e..968719b54593 100644 --- a/pkg/sql/colexec/colexecagg/hash_sum_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_sum_agg.eg.go @@ -134,7 +134,7 @@ func (a *sumInt16HashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -243,7 +243,7 @@ func (a *sumInt32HashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -352,7 +352,7 @@ func (a *sumInt64HashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -459,7 +459,7 @@ func (a *sumDecimalHashAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -560,7 +560,7 @@ func (a *sumFloat64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -651,7 +651,7 @@ func (a *sumIntervalHashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/hash_sum_int_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_sum_int_agg.eg.go index c20a89e63842..3648e3b0617f 100644 --- a/pkg/sql/colexec/colexecagg/hash_sum_int_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_sum_int_agg.eg.go @@ -114,7 +114,7 @@ func (a *sumIntInt16HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -221,7 +221,7 @@ func (a *sumIntInt32HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -328,7 +328,7 @@ func (a *sumIntInt64HashAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go b/pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go index 4cf955fc5c57..9bb33fcc7e9f 100644 --- a/pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go @@ -179,7 +179,7 @@ func (a *_AGG_TYPE_AGGKINDAgg) Compute( // {{end}} execgen.SETVARIABLESIZE(newCurAggSize, a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_any_not_null_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_any_not_null_agg.eg.go index ab587b5f479f..7f2f55617410 100644 --- a/pkg/sql/colexec/colexecagg/ordered_any_not_null_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_any_not_null_agg.eg.go @@ -256,7 +256,7 @@ func (a *anyNotNullBoolOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -455,7 +455,7 @@ func (a *anyNotNullBytesOrderedAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -658,7 +658,7 @@ func (a *anyNotNullDecimalOrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -857,7 +857,7 @@ func (a *anyNotNullInt16OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1056,7 +1056,7 @@ func (a *anyNotNullInt32OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1255,7 +1255,7 @@ func (a *anyNotNullInt64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1454,7 +1454,7 @@ func (a *anyNotNullFloat64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1653,7 +1653,7 @@ func (a *anyNotNullTimestampOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1852,7 +1852,7 @@ func (a *anyNotNullIntervalOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2101,7 +2101,7 @@ func (a *anyNotNullJSONOrderedAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2314,7 +2314,7 @@ func (a *anyNotNullDatumOrderedAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_avg_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_avg_agg.eg.go index 872075b9b42c..f56b0680b872 100644 --- a/pkg/sql/colexec/colexecagg/ordered_avg_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_avg_agg.eg.go @@ -274,7 +274,7 @@ func (a *avgInt16OrderedAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -532,7 +532,7 @@ func (a *avgInt32OrderedAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -790,7 +790,7 @@ func (a *avgInt64OrderedAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -1044,7 +1044,7 @@ func (a *avgDecimalOrderedAgg) Compute( ) newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -1270,7 +1270,7 @@ func (a *avgFloat64OrderedAgg) Compute( ) var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -1472,7 +1472,7 @@ func (a *avgIntervalOrderedAgg) Compute( ) var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go index 1cc88d561f63..7d00dda2dba5 100644 --- a/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go @@ -168,7 +168,7 @@ func (a *boolAndOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -361,7 +361,7 @@ func (a *boolOrOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_concat_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_concat_agg.eg.go index 493bd4b60247..4ccbf2508c3f 100644 --- a/pkg/sql/colexec/colexecagg/ordered_concat_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_concat_agg.eg.go @@ -168,7 +168,7 @@ func (a *concatOrderedAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go index 3f7d41fd887f..f3d9914ac9e3 100644 --- a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go @@ -232,7 +232,7 @@ func (a *defaultOrderedAggAlloc) newAggFunc() AggregateFunc { } f.allocator = a.allocator f.scratch.otherArgs = a.otherArgsScratch - a.allocator.AdjustMemoryUsage(f.fn.Size()) + a.allocator.AdjustMemoryUsageAfterAllocation(f.fn.Size()) a.aggFuncs = a.aggFuncs[1:] a.returnedFns = append(a.returnedFns, f) return f diff --git a/pkg/sql/colexec/colexecagg/ordered_min_max_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_min_max_agg.eg.go index 8049701a4b68..29085091dde9 100644 --- a/pkg/sql/colexec/colexecagg/ordered_min_max_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_min_max_agg.eg.go @@ -351,7 +351,7 @@ func (a *minBoolOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -606,7 +606,7 @@ func (a *minBytesOrderedAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -870,7 +870,7 @@ func (a *minDecimalOrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1169,7 +1169,7 @@ func (a *minInt16OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1468,7 +1468,7 @@ func (a *minInt32OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1767,7 +1767,7 @@ func (a *minInt64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2098,7 +2098,7 @@ func (a *minFloat64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2381,7 +2381,7 @@ func (a *minTimestampOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2636,7 +2636,7 @@ func (a *minIntervalOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3009,7 +3009,7 @@ func (a *minJSONOrderedAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3295,7 +3295,7 @@ func (a *minDatumOrderedAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3668,7 +3668,7 @@ func (a *maxBoolOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3923,7 +3923,7 @@ func (a *maxBytesOrderedAgg) Compute( ) newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -4187,7 +4187,7 @@ func (a *maxDecimalOrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -4486,7 +4486,7 @@ func (a *maxInt16OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -4785,7 +4785,7 @@ func (a *maxInt32OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -5084,7 +5084,7 @@ func (a *maxInt64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -5415,7 +5415,7 @@ func (a *maxFloat64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -5698,7 +5698,7 @@ func (a *maxTimestampOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -5953,7 +5953,7 @@ func (a *maxIntervalOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -6326,7 +6326,7 @@ func (a *maxJSONOrderedAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -6612,7 +6612,7 @@ func (a *maxDatumOrderedAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_sum_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_sum_agg.eg.go index 61f1285e7ff3..b5b09e71fe01 100644 --- a/pkg/sql/colexec/colexecagg/ordered_sum_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_sum_agg.eg.go @@ -259,7 +259,7 @@ func (a *sumInt16OrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -498,7 +498,7 @@ func (a *sumInt32OrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -737,7 +737,7 @@ func (a *sumInt64OrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -972,7 +972,7 @@ func (a *sumDecimalOrderedAgg) Compute( ) newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1195,7 +1195,7 @@ func (a *sumFloat64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1398,7 +1398,7 @@ func (a *sumIntervalOrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/ordered_sum_int_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_sum_int_agg.eg.go index 477f83ffe5c2..0f7900e61295 100644 --- a/pkg/sql/colexec/colexecagg/ordered_sum_int_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_sum_int_agg.eg.go @@ -237,7 +237,7 @@ func (a *sumIntInt16OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -472,7 +472,7 @@ func (a *sumIntInt32OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -707,7 +707,7 @@ func (a *sumIntInt64OrderedAgg) Compute( ) var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/sum_agg_tmpl.go b/pkg/sql/colexec/colexecagg/sum_agg_tmpl.go index 1347a93f7b10..07765959048a 100644 --- a/pkg/sql/colexec/colexecagg/sum_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/sum_agg_tmpl.go @@ -173,7 +173,7 @@ func (a *sum_SUMKIND_TYPE_AGGKINDAgg) Compute( // {{end}} execgen.SETVARIABLESIZE(newCurAggSize, a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_avg_agg.eg.go b/pkg/sql/colexec/colexecagg/window_avg_agg.eg.go index a1bf6255f001..20ee30d631c2 100644 --- a/pkg/sql/colexec/colexecagg/window_avg_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_avg_agg.eg.go @@ -135,7 +135,7 @@ func (a *avgInt16WindowAgg) Compute( } newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -305,7 +305,7 @@ func (a *avgInt32WindowAgg) Compute( } newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -475,7 +475,7 @@ func (a *avgInt64WindowAgg) Compute( } newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -643,7 +643,7 @@ func (a *avgDecimalWindowAgg) Compute( } newCurSumSize := a.curSum.Size() if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -803,7 +803,7 @@ func (a *avgFloat64WindowAgg) Compute( } var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } @@ -943,7 +943,7 @@ func (a *avgIntervalWindowAgg) Compute( } var newCurSumSize uintptr if newCurSumSize != oldCurSumSize { - a.allocator.AdjustMemoryUsage(int64(newCurSumSize - oldCurSumSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurSumSize - oldCurSumSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_bool_and_or_agg.eg.go b/pkg/sql/colexec/colexecagg/window_bool_and_or_agg.eg.go index 17b58f8181ba..23a4e1cbaabc 100644 --- a/pkg/sql/colexec/colexecagg/window_bool_and_or_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_bool_and_or_agg.eg.go @@ -76,7 +76,7 @@ func (a *boolAndWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -172,7 +172,7 @@ func (a *boolOrWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_concat_agg.eg.go b/pkg/sql/colexec/colexecagg/window_concat_agg.eg.go index 6db36d54498f..0095cba377cc 100644 --- a/pkg/sql/colexec/colexecagg/window_concat_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_concat_agg.eg.go @@ -64,7 +64,7 @@ func (a *concatWindowAgg) Compute( } newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_min_max_agg.eg.go b/pkg/sql/colexec/colexecagg/window_min_max_agg.eg.go index b561e5a9bc97..8ee77d0b0baf 100644 --- a/pkg/sql/colexec/colexecagg/window_min_max_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_min_max_agg.eg.go @@ -204,7 +204,7 @@ func (a *minBoolWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -331,7 +331,7 @@ func (a *minBytesWindowAgg) Compute( } newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -462,7 +462,7 @@ func (a *minDecimalWindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -611,7 +611,7 @@ func (a *minInt16WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -760,7 +760,7 @@ func (a *minInt32WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -909,7 +909,7 @@ func (a *minInt64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1074,7 +1074,7 @@ func (a *minFloat64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1215,7 +1215,7 @@ func (a *minTimestampWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1342,7 +1342,7 @@ func (a *minIntervalWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1531,7 +1531,7 @@ func (a *minJSONWindowAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1677,7 +1677,7 @@ func (a *minDatumWindowAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -1897,7 +1897,7 @@ func (a *maxBoolWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2024,7 +2024,7 @@ func (a *maxBytesWindowAgg) Compute( } newCurAggSize := len(a.curAgg) if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2155,7 +2155,7 @@ func (a *maxDecimalWindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2304,7 +2304,7 @@ func (a *maxInt16WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2453,7 +2453,7 @@ func (a *maxInt32WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2602,7 +2602,7 @@ func (a *maxInt64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2767,7 +2767,7 @@ func (a *maxFloat64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -2908,7 +2908,7 @@ func (a *maxTimestampWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3035,7 +3035,7 @@ func (a *maxIntervalWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3224,7 +3224,7 @@ func (a *maxJSONWindowAgg) Compute( newCurAggSize = a.curAgg.Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -3370,7 +3370,7 @@ func (a *maxDatumWindowAgg) Compute( newCurAggSize = a.curAgg.(tree.Datum).Size() } if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_sum_agg.eg.go b/pkg/sql/colexec/colexecagg/window_sum_agg.eg.go index fb52a3530d63..4ec16291e53e 100644 --- a/pkg/sql/colexec/colexecagg/window_sum_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_sum_agg.eg.go @@ -134,7 +134,7 @@ func (a *sumInt16WindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -301,7 +301,7 @@ func (a *sumInt32WindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -468,7 +468,7 @@ func (a *sumInt64WindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -633,7 +633,7 @@ func (a *sumDecimalWindowAgg) Compute( } newCurAggSize := a.curAgg.Size() if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -790,7 +790,7 @@ func (a *sumFloat64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -931,7 +931,7 @@ func (a *sumIntervalWindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexecagg/window_sum_int_agg.eg.go b/pkg/sql/colexec/colexecagg/window_sum_int_agg.eg.go index 5fb7edf555fe..9bde92947fba 100644 --- a/pkg/sql/colexec/colexecagg/window_sum_int_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/window_sum_int_agg.eg.go @@ -114,7 +114,7 @@ func (a *sumIntInt16WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -277,7 +277,7 @@ func (a *sumIntInt32WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } @@ -440,7 +440,7 @@ func (a *sumIntInt64WindowAgg) Compute( } var newCurAggSize uintptr if newCurAggSize != oldCurAggSize { - a.allocator.AdjustMemoryUsage(int64(newCurAggSize - oldCurAggSize)) + a.allocator.AdjustMemoryUsageAfterAllocation(int64(newCurAggSize - oldCurAggSize)) } } diff --git a/pkg/sql/colexec/colexechash/hashtable.go b/pkg/sql/colexec/colexechash/hashtable.go index c67a2f76561d..17f388a36b17 100644 --- a/pkg/sql/colexec/colexechash/hashtable.go +++ b/pkg/sql/colexec/colexechash/hashtable.go @@ -417,11 +417,6 @@ func (p *hashTableProbeBuffer) accountForLimitedSlices(allocator *colmem.Allocat p.limitedSlicesAreAccountedFor = true } -func (ht *HashTable) accountForUnlimitedSlices(newUint64Count int64) { - ht.allocator.AdjustMemoryUsage(memsize.Uint64 * (newUint64Count - ht.unlimitedSlicesNumUint64AccountedFor)) - ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count -} - func (ht *HashTable) unlimitedSlicesCapacity() int64 { // Note that if ht.ProbeScratch.First is nil, it'll have zero capacity. return int64(cap(ht.BuildScratch.First) + cap(ht.ProbeScratch.First) + cap(ht.BuildScratch.Next)) @@ -448,13 +443,15 @@ func (ht *HashTable) buildFromBufferedTuples() { // perform the memory accounting for the anticipated memory usage. Note // that it might not be precise, so we'll reconcile after the // allocations below. - ht.accountForUnlimitedSlices(needCapacity) + ht.allocator.AdjustMemoryUsage(memsize.Uint64 * (needCapacity - ht.unlimitedSlicesNumUint64AccountedFor)) + ht.unlimitedSlicesNumUint64AccountedFor = needCapacity } // Perform the actual build. ht.buildFromBufferedTuplesNoAccounting() // Now ensure that the accounting is precise (cap's of the slices might // exceed len's that we've accounted for). - ht.accountForUnlimitedSlices(ht.unlimitedSlicesCapacity()) + ht.allocator.AdjustMemoryUsageAfterAllocation(memsize.Uint64 * (ht.unlimitedSlicesCapacity() - ht.unlimitedSlicesNumUint64AccountedFor)) + ht.unlimitedSlicesNumUint64AccountedFor = ht.unlimitedSlicesCapacity() } // buildFromBufferedTuples builds the hash table from already buffered tuples in diff --git a/pkg/sql/colexec/colexecjoin/hashjoiner.go b/pkg/sql/colexec/colexecjoin/hashjoiner.go index 2a8da3e5e5e2..e7cd552f96dc 100644 --- a/pkg/sql/colexec/colexecjoin/hashjoiner.go +++ b/pkg/sql/colexec/colexecjoin/hashjoiner.go @@ -339,7 +339,7 @@ func (hj *hashJoiner) build() { // to fallback to disk, thus, we use the unlimited allocator. newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same)) // hj.ht.Same will never shrink, so the delta is non-negative. - hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableSame) + hj.outputUnlimitedAllocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableSame) hj.accountedFor.hashtableSame = newAccountedFor } if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() { @@ -351,7 +351,7 @@ func (hj *hashJoiner) build() { // to fallback to disk, thus, we use the unlimited allocator. newAccountedFor := memsize.Bool * int64(cap(hj.ht.Visited)) // hj.ht.Visited will never shrink, so the delta is non-negative. - hj.outputUnlimitedAllocator.AdjustMemoryUsage(newAccountedFor - hj.accountedFor.hashtableVisited) + hj.outputUnlimitedAllocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableVisited) hj.accountedFor.hashtableVisited = newAccountedFor // Since keyID = 0 is reserved for end of list, it can be marked as visited // at the beginning. diff --git a/pkg/sql/colexec/colexecspan/span_assembler.go b/pkg/sql/colexec/colexecspan/span_assembler.go index b6da7ee51e48..796c26407d51 100644 --- a/pkg/sql/colexec/colexecspan/span_assembler.go +++ b/pkg/sql/colexec/colexecspan/span_assembler.go @@ -70,7 +70,7 @@ func NewColSpanAssembler( // Account for the memory currently in use. sa.spansBytes = int64(cap(sa.spans)) * spanSize - sa.allocator.AdjustMemoryUsage(sa.spansBytes) + sa.allocator.AdjustMemoryUsageAfterAllocation(sa.spansBytes) return sa } @@ -239,7 +239,7 @@ func (sa *spanAssembler) ConsumeBatch(batch coldata.Batch, startIdx, endIdx int) // Account for the memory allocated for the span slice and keys. keyBytesMem := int64(sa.keyBytes - oldKeyBytes) sa.spansBytes = int64(cap(sa.spans)) * spanSize - sa.allocator.AdjustMemoryUsage((sa.spansBytes - oldSpansBytes) + keyBytesMem) + sa.allocator.AdjustMemoryUsageAfterAllocation((sa.spansBytes - oldSpansBytes) + keyBytesMem) } const spanSize = int64(unsafe.Sizeof(roachpb.Span{})) @@ -264,7 +264,7 @@ func (sa *spanAssembler) AccountForSpans() { )) } sa.spansBytes = int64(cap(sa.spans)) * spanSize - sa.allocator.AdjustMemoryUsage(sa.spansBytes) + sa.allocator.AdjustMemoryUsageAfterAllocation(sa.spansBytes) } // Close implements the ColSpanAssembler interface. diff --git a/pkg/sql/colexec/colexecspan/span_encoder.eg.go b/pkg/sql/colexec/colexecspan/span_encoder.eg.go index a3419f4f94bf..71fa7383a06c 100644 --- a/pkg/sql/colexec/colexecspan/span_encoder.eg.go +++ b/pkg/sql/colexec/colexecspan/span_encoder.eg.go @@ -285,7 +285,7 @@ func (op *spanEncoderBoolAsc) next(batch coldata.Batch, startIdx, endIdx int) *c } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -364,7 +364,7 @@ func (op *spanEncoderBytesAsc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -445,7 +445,7 @@ func (op *spanEncoderDecimalAsc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -526,7 +526,7 @@ func (op *spanEncoderInt16Asc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -607,7 +607,7 @@ func (op *spanEncoderInt32Asc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -688,7 +688,7 @@ func (op *spanEncoderInt64Asc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -769,7 +769,7 @@ func (op *spanEncoderFloat64Asc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -850,7 +850,7 @@ func (op *spanEncoderTimestampAsc) next(batch coldata.Batch, startIdx, endIdx in } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -955,7 +955,7 @@ func (op *spanEncoderIntervalAsc) next(batch coldata.Batch, startIdx, endIdx int } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1058,7 +1058,7 @@ func (op *spanEncoderDatumAsc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1167,7 +1167,7 @@ func (op *spanEncoderBoolDesc) next(batch coldata.Batch, startIdx, endIdx int) * } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1246,7 +1246,7 @@ func (op *spanEncoderBytesDesc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1327,7 +1327,7 @@ func (op *spanEncoderDecimalDesc) next(batch coldata.Batch, startIdx, endIdx int } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1408,7 +1408,7 @@ func (op *spanEncoderInt16Desc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1489,7 +1489,7 @@ func (op *spanEncoderInt32Desc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1570,7 +1570,7 @@ func (op *spanEncoderInt64Desc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1651,7 +1651,7 @@ func (op *spanEncoderFloat64Desc) next(batch coldata.Batch, startIdx, endIdx int } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1732,7 +1732,7 @@ func (op *spanEncoderTimestampDesc) next(batch coldata.Batch, startIdx, endIdx i } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1837,7 +1837,7 @@ func (op *spanEncoderIntervalDesc) next(batch coldata.Batch, startIdx, endIdx in } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } @@ -1940,7 +1940,7 @@ func (op *spanEncoderDatumDesc) next(batch coldata.Batch, startIdx, endIdx int) } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } diff --git a/pkg/sql/colexec/colexecspan/span_encoder_tmpl.go b/pkg/sql/colexec/colexecspan/span_encoder_tmpl.go index 565ee0fc6021..2610ecc2300e 100644 --- a/pkg/sql/colexec/colexecspan/span_encoder_tmpl.go +++ b/pkg/sql/colexec/colexecspan/span_encoder_tmpl.go @@ -168,7 +168,7 @@ func (op *_OP_STRING) next(batch coldata.Batch, startIdx, endIdx int) *coldata.B } } - op.allocator.AdjustMemoryUsage(op.outputBytes.Size() - oldBytesSize) + op.allocator.AdjustMemoryUsageAfterAllocation(op.outputBytes.Size() - oldBytesSize) return op.outputBytes } diff --git a/pkg/sql/colexec/colexecutils/spilling_buffer.go b/pkg/sql/colexec/colexecutils/spilling_buffer.go index 64d6f54327aa..f1a00d442e51 100644 --- a/pkg/sql/colexec/colexecutils/spilling_buffer.go +++ b/pkg/sql/colexec/colexecutils/spilling_buffer.go @@ -275,7 +275,7 @@ func (b *SpillingBuffer) GetVecWithTuple( // it, then account for the current one. b.unlimitedAllocator.ReleaseMemory(b.lastDequeuedBatchMemUsage) b.lastDequeuedBatchMemUsage = colmem.GetBatchMemSize(b.dequeueScratch) - b.unlimitedAllocator.AdjustMemoryUsage(b.lastDequeuedBatchMemUsage) + b.unlimitedAllocator.AdjustMemoryUsageAfterAllocation(b.lastDequeuedBatchMemUsage) return b.dequeueScratch.ColVec(colIdx), rowIdx, b.dequeueScratch.Length() } // The requested tuple must be located further into the disk queue. diff --git a/pkg/sql/colexec/colexecutils/spilling_queue.go b/pkg/sql/colexec/colexecutils/spilling_queue.go index 78eea34d6d74..031a48898d2d 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue.go @@ -376,7 +376,7 @@ func (q *SpillingQueue) Dequeue(ctx context.Context) (coldata.Batch, error) { // batch to Dequeue() from disk into it. q.unlimitedAllocator.ReleaseMemory(q.lastDequeuedBatchMemUsage) q.lastDequeuedBatchMemUsage = colmem.GetBatchMemSize(q.dequeueScratch) - q.unlimitedAllocator.AdjustMemoryUsage(q.lastDequeuedBatchMemUsage) + q.unlimitedAllocator.AdjustMemoryUsageAfterAllocation(q.lastDequeuedBatchMemUsage) if q.rewindable { q.rewindableState.numItemsDequeued++ } else { diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index 71c6ac945684..39aaba6d6da8 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -331,7 +331,7 @@ func AccountForMetadata(allocator *colmem.Allocator, meta []execinfrapb.Producer // since it might be of non-trivial size. if ltfs := meta[i].LeafTxnFinalState; ltfs != nil { memUsage := roachpb.Spans(ltfs.RefreshSpans).MemUsage() - allocator.AdjustMemoryUsage(memUsage) + allocator.AdjustMemoryUsageAfterAllocation(memUsage) } } } diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 12781b020dd1..dc5bf14e826f 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -1027,9 +1027,10 @@ func TestHashJoiner(t *testing.T) { runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) { spec := createSpecForHashJoiner(tc) args := &colexecargs.NewColOperatorArgs{ - Spec: spec, - Inputs: colexectestutils.MakeInputs(sources), - MonitorRegistry: &monitorRegistry, + Spec: spec, + StreamingMemAccount: monitorRegistry.NewStreamingMemAccount(flowCtx), + Inputs: colexectestutils.MakeInputs(sources), + MonitorRegistry: &monitorRegistry, } args.TestingKnobs.DiskSpillingDisabled = true result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index 6cf4105ee8f5..85b5c77e8d6e 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -389,7 +389,7 @@ func (p *sortOp) sort() { p.scratch.partitionsCol = colexecutils.MaybeAllocateBoolArray(p.scratch.partitionsCol, spooledTuples) partitionsCol = p.scratch.partitionsCol sizeAfter := memsize.Bool * int64(cap(p.scratch.partitionsCol)) - p.allocator.AdjustMemoryUsage(sizeAfter - sizeBefore) + p.allocator.AdjustMemoryUsageAfterAllocation(sizeAfter - sizeBefore) } else { // There are at least two partitions already, so the first column needs the // same special treatment as all others. The general sequence is as @@ -441,7 +441,7 @@ func (p *sortOp) sort() { sizeBefore := memsize.Int * int64(cap(p.scratch.partitions)) p.scratch.partitions = boolVecToSel(partitionsCol, p.scratch.partitions[:0]) sizeAfter := memsize.Int * int64(cap(p.scratch.partitions)) - p.allocator.AdjustMemoryUsage(sizeAfter - sizeBefore) + p.allocator.AdjustMemoryUsageAfterAllocation(sizeAfter - sizeBefore) // For each partition (set of tuples that are identical in all of the sort // columns we've seen so far), sort based on the new column. sorter.sortPartitions(p.scratch.partitions) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index effa3d743664..3ada2151cfa1 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -411,7 +411,7 @@ func (i *Inbox) Next() coldata.Batch { atomic.AddInt64(&i.statsAtomics.bytesRead, numSerializedBytes) // Update the allocator since we're holding onto the serialized bytes // for now. - i.allocator.AdjustMemoryUsage(numSerializedBytes) + i.allocator.AdjustMemoryUsageAfterAllocation(numSerializedBytes) // Do admission control after memory accounting for the serialized bytes // and before deserialization. if i.admissionQ != nil { diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 55682104bf6a..e62450e2ec4a 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches( // Note that because we never truncate the buffer, we are only // adjusting the memory usage whenever the buffer's capacity // increases (if it didn't increase, this call becomes a noop). - o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) + o.unlimitedAllocator.AdjustMemoryUsageAfterAllocation(int64(o.scratch.buf.Cap() - oldBufCap)) o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes() // o.scratch.msg can be reused as soon as Send returns since it returns as diff --git a/pkg/sql/colmem/BUILD.bazel b/pkg/sql/colmem/BUILD.bazel index 3f7b746cd173..5475e0b2d998 100644 --- a/pkg/sql/colmem/BUILD.bazel +++ b/pkg/sql/colmem/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/util", + "//pkg/util/buildutil", "//pkg/util/mon", "@com_github_cockroachdb_errors//:errors", ], @@ -23,6 +24,7 @@ go_test( name = "colmem_test", size = "small", srcs = [ + "adjust_memory_usage_test.go", "allocator_test.go", "reset_maybe_reallocate_test.go", ], @@ -45,6 +47,7 @@ go_test( "//pkg/util/mon", "//pkg/util/randutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/colmem/adjust_memory_usage_test.go b/pkg/sql/colmem/adjust_memory_usage_test.go new file mode 100644 index 000000000000..64e28922fb22 --- /dev/null +++ b/pkg/sql/colmem/adjust_memory_usage_test.go @@ -0,0 +1,89 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colmem + +import ( + "context" + "math" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/col/coldataext" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/redact" + "github.com/stretchr/testify/require" +) + +func TestAdjustMemoryUsage(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + unlimitedMemMonitor := execinfra.NewTestMemMonitor(ctx, st) + defer unlimitedMemMonitor.Stop(ctx) + unlimitedMemAcc := unlimitedMemMonitor.MakeBoundAccount() + defer unlimitedMemAcc.Close(ctx) + + limitedMemMonitorName := "test-limited" + limit := int64(100000) + limitedMemMonitor := mon.NewMonitorInheritWithLimit( + redact.RedactableString(limitedMemMonitorName), limit, unlimitedMemMonitor, + ) + limitedMemMonitor.StartNoReserved(ctx, unlimitedMemMonitor) + defer limitedMemMonitor.Stop(ctx) + limitedMemAcc := limitedMemMonitor.MakeBoundAccount() + defer limitedMemAcc.Close(ctx) + + evalCtx := eval.MakeTestingEvalContext(st) + testColumnFactory := coldataext.NewExtendedColumnFactory(&evalCtx) + allocator := NewLimitedAllocator(ctx, &limitedMemAcc, &unlimitedMemAcc, testColumnFactory) + + // Check that no error occurs if the limit is not exceeded. + require.NotPanics(t, func() { allocator.AdjustMemoryUsage(limit / 2) }) + require.Equal(t, limit/2, limitedMemAcc.Used()) + require.Zero(t, unlimitedMemAcc.Used()) + + // Exceed the limit "before" making an allocation and ensure that the + // unlimited account has not been grown. + err := colexecerror.CatchVectorizedRuntimeError(func() { allocator.AdjustMemoryUsage(limit) }) + require.NotNil(t, err) + require.True(t, strings.Contains(err.Error(), limitedMemMonitorName)) + require.Equal(t, limit/2, limitedMemAcc.Used()) + require.Zero(t, unlimitedMemAcc.Used()) + + // Now exceed the limit "after" making an allocation and ensure that the + // unlimited account has been grown. + err = colexecerror.CatchVectorizedRuntimeError(func() { allocator.AdjustMemoryUsageAfterAllocation(limit) }) + require.NotNil(t, err) + require.True(t, strings.Contains(err.Error(), limitedMemMonitorName)) + require.Equal(t, limit/2, limitedMemAcc.Used()) + require.Equal(t, limit, unlimitedMemAcc.Used()) + + // Ensure that the error from the unlimited memory account is returned when + // it cannot be grown. + err = colexecerror.CatchVectorizedRuntimeError(func() { allocator.AdjustMemoryUsageAfterAllocation(math.MaxInt64) }) + require.NotNil(t, err) + require.False(t, strings.Contains(err.Error(), limitedMemMonitorName)) + require.Equal(t, limit/2, limitedMemAcc.Used()) + require.Equal(t, limit, unlimitedMemAcc.Used()) + + // Verify that both accounts are cleared on ReleaseAll. + allocator.ReleaseAll() + require.Zero(t, limitedMemAcc.Used()) + require.Zero(t, unlimitedMemAcc.Used()) +} diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index b9fdf1cbd1ac..523d086d5c52 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" ) @@ -34,9 +35,12 @@ import ( // // In the future this can also be used to pool coldata.Vec allocations. type Allocator struct { - ctx context.Context - acc *mon.BoundAccount - factory coldata.ColumnFactory + ctx context.Context + acc *mon.BoundAccount + // unlimitedAcc might be nil and is only used in some cases when the + // allocation is denied by acc. + unlimitedAcc *mon.BoundAccount + factory coldata.ColumnFactory } // SelVectorSize returns the memory usage of the selection vector of the given @@ -111,17 +115,44 @@ func GetProportionalBatchMemSize(b coldata.Batch, length int64) int64 { return proportionalBatchMemSize } -// NewAllocator constructs a new Allocator instance. +// NewAllocator constructs a new Allocator instance with an unlimited memory +// account. func NewAllocator( - ctx context.Context, acc *mon.BoundAccount, factory coldata.ColumnFactory, + ctx context.Context, unlimitedAcc *mon.BoundAccount, factory coldata.ColumnFactory, ) *Allocator { + if buildutil.CrdbTestBuild { + if unlimitedAcc != nil { + if l := unlimitedAcc.Monitor().Limit(); l != math.MaxInt64 { + colexecerror.InternalError(errors.AssertionFailedf( + "unexpectedly NewAllocator is called with an account with limit of %d bytes", l, + )) + } + } + } return &Allocator{ ctx: ctx, - acc: acc, + acc: unlimitedAcc, factory: factory, } } +// NewLimitedAllocator constructs a new Allocator instance which works with a +// limited memory account. The unlimited memory account is optional, and it'll +// be used only for the allocations that are denied by the limited memory +// account when using Allocator.PerformAppend, Allocator.PerformOperation, and +// SetAccountingHelper.AccountForSet as well as +// Allocator.AdjustMemoryUsageAfterAllocation. +func NewLimitedAllocator( + ctx context.Context, limitedAcc, unlimitedAcc *mon.BoundAccount, factory coldata.ColumnFactory, +) *Allocator { + return &Allocator{ + ctx: ctx, + acc: limitedAcc, + unlimitedAcc: unlimitedAcc, + factory: factory, + } +} + // NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with the // given vector capacity. // Note: consider whether you want the dynamic batch size behavior (in which @@ -349,7 +380,7 @@ func (a *Allocator) PerformOperation(destVecs []coldata.Vec, operation func()) { operation() after := getVecsMemoryFootprint(destVecs) - a.AdjustMemoryUsage(after - before) + a.AdjustMemoryUsageAfterAllocation(after - before) } // PerformAppend is used to account for memory usage during calls to @@ -383,7 +414,7 @@ func (a *Allocator) PerformAppend(batch coldata.Batch, operation func()) { after += getVecMemoryFootprint(dest) } } - a.AdjustMemoryUsage(after - before) + a.AdjustMemoryUsageAfterAllocation(after - before) } // Used returns the number of bytes currently allocated through this allocator. @@ -391,11 +422,29 @@ func (a *Allocator) Used() int64 { return a.acc.Used() } -// AdjustMemoryUsage adjusts the number of bytes currently allocated through +// adjustMemoryUsage adjusts the number of bytes currently allocated through // this allocator by delta bytes (which can be both positive or negative). -func (a *Allocator) AdjustMemoryUsage(delta int64) { +// +// If: +// - afterAllocation is true, +// - the allocator was created via NewLimitedAllocator with a non-nil unlimited +// memory account, +// - the positive delta allocation is denied by the limited memory account, +// then the unlimited account is grown by delta. The memory error is still +// thrown. +func (a *Allocator) adjustMemoryUsage(delta int64, afterAllocation bool) { if delta > 0 { if err := a.acc.Grow(a.ctx, delta); err != nil { + // If we were given a separate unlimited account and the adjustment + // is performed after the allocation has already occurred, then grow + // the unlimited account. + if a.unlimitedAcc != nil && afterAllocation { + if newErr := a.unlimitedAcc.Grow(a.ctx, delta); newErr != nil { + // Prefer the error from the unlimited account since it + // indicates that --max-sql-memory pool has been used up. + colexecerror.InternalError(newErr) + } + } colexecerror.InternalError(err) } } else if delta < 0 { @@ -403,6 +452,22 @@ func (a *Allocator) AdjustMemoryUsage(delta int64) { } } +// AdjustMemoryUsage adjusts the number of bytes currently allocated through +// this allocator by delta bytes (which can be both positive or negative). +func (a *Allocator) AdjustMemoryUsage(delta int64) { + a.adjustMemoryUsage(delta, false /* afterAllocation */) +} + +// AdjustMemoryUsageAfterAllocation is similar to AdjustMemoryUsage with a +// difference that if 1) the allocator was created via NewLimitedAllocator, and +// 2) the allocation is denied by the limited memory account, then the unlimited +// account will be grown. The memory error is still thrown. It should be used +// whenever the caller has already incurred an allocation of delta bytes, and it +// is desirable to account for that allocation against some budget. +func (a *Allocator) AdjustMemoryUsageAfterAllocation(delta int64) { + a.adjustMemoryUsage(delta, true /* afterAllocation */) +} + // ReleaseMemory reduces the number of bytes currently allocated through this // allocator by (at most) size bytes. size must be non-negative. func (a *Allocator) ReleaseMemory(size int64) { @@ -422,6 +487,9 @@ func (a *Allocator) ReleaseMemory(size int64) { // with any other component. func (a *Allocator) ReleaseAll() { a.ReleaseMemory(a.Used()) + if a.unlimitedAcc != nil { + a.unlimitedAcc.Shrink(a.ctx, a.unlimitedAcc.Used()) + } } // sizeOfDecimals returns the size of the given decimals slice. It only accounts @@ -815,7 +883,7 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) (batchDone bool) { if len(h.bytesLikeVectors) > 0 { newBytesLikeTotalSize := h.getBytesLikeTotalSize() - h.helper.allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) + h.helper.allocator.AdjustMemoryUsageAfterAllocation(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) h.prevBytesLikeTotalSize = newBytesLikeTotalSize } @@ -825,7 +893,7 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) (batchDone bool) { d := decimalVec.Get(rowIdx) newDecimalSizes += int64(d.Size()) } - h.helper.allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) + h.helper.allocator.AdjustMemoryUsageAfterAllocation(newDecimalSizes - h.decimalSizes[rowIdx]) h.decimalSizes[rowIdx] = newDecimalSizes } @@ -837,7 +905,7 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) (batchDone bool) { // was already included in EstimateBatchSizeBytes. newVarLengthDatumSize += int64(datumSize) } - h.helper.allocator.AdjustMemoryUsage(newVarLengthDatumSize) + h.helper.allocator.AdjustMemoryUsageAfterAllocation(newVarLengthDatumSize) } // The allocator is not shared with any other components, so we can just use diff --git a/pkg/sql/sem/eval/eval_test/eval_test.go b/pkg/sql/sem/eval/eval_test/eval_test.go index 1f1be25f4fd6..8d7ed4db1158 100644 --- a/pkg/sql/sem/eval/eval_test/eval_test.go +++ b/pkg/sql/sem/eval/eval_test/eval_test.go @@ -189,8 +189,8 @@ func TestEval(t *testing.T) { batchesReturned++ return batch }}, - }, - }, + }}, + StreamingMemAccount: &acc, // Unsupported post processing specs are wrapped and run through the // row execution engine. ProcessorConstructor: rowexec.NewProcessor, diff --git a/pkg/util/mon/bytes_usage.go b/pkg/util/mon/bytes_usage.go index 4d2f9246eb37..d7c752a4d579 100644 --- a/pkg/util/mon/bytes_usage.go +++ b/pkg/util/mon/bytes_usage.go @@ -426,6 +426,11 @@ func (mm *BytesMonitor) Name() string { return string(mm.name) } +// Limit returns the memory limit of the monitor. +func (mm *BytesMonitor) Limit() int64 { + return mm.limit +} + const bytesMaxUsageLoggingThreshold = 100 * 1024 func (mm *BytesMonitor) doStop(ctx context.Context, check bool) {