diff --git a/pkg/sql/colexec/hash_aggregator.eg.go b/pkg/sql/colexec/hash_aggregator.eg.go index 40dc0d6fca90..c4beee40a04b 100644 --- a/pkg/sql/colexec/hash_aggregator.eg.go +++ b/pkg/sql/colexec/hash_aggregator.eg.go @@ -456,26 +456,21 @@ func getNext_true(op *hashAggregator) coldata.Batch { case hashAggregatorOutputting: // Note that ResetMaybeReallocate truncates the requested capacity // at coldata.BatchSize(), so we can just try asking for - // len(op.buckets) capacity. + // len(op.buckets)-op.curOutputBucketIdx (the number of remaining + // output tuples) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( - op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, - true, /* desiredCapacitySufficient */ + op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx, true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for batchDone := false; op.curOutputBucketIdx < len(op.buckets) && !batchDone; { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchDone = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx - } } if op.curOutputBucketIdx >= len(op.buckets) { if l := op.bufferingState.pendingBatch.Length(); l > 0 { @@ -604,26 +599,21 @@ func getNext_false(op *hashAggregator) coldata.Batch { case hashAggregatorOutputting: // Note that ResetMaybeReallocate truncates the requested capacity // at coldata.BatchSize(), so we can just try asking for - // len(op.buckets) capacity. + // len(op.buckets)-op.curOutputBucketIdx (the number of remaining + // output tuples) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( - op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, - true, /* desiredCapacitySufficient */ + op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx, true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for batchDone := false; op.curOutputBucketIdx < len(op.buckets) && !batchDone; { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchDone = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx - } } if op.curOutputBucketIdx >= len(op.buckets) { op.state = hashAggregatorDone diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index ade5d03f908d..c13432c0448c 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -141,12 +141,7 @@ type hashAggregator struct { // populating the output. curOutputBucketIdx int - maxOutputBatchMemSize int64 - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int - output coldata.Batch + output coldata.Batch aggFnsAlloc *colexecagg.AggregateFuncsAlloc hashAlloc aggBucketAlloc @@ -212,19 +207,18 @@ func NewHashAggregator( colexecerror.InternalError(err) } hashAgg := &hashAggregator{ - OneInputNode: colexecop.NewOneInputNode(args.Input), - hashTableAllocator: hashTableAllocator, - spec: args.Spec, - state: hashAggregatorBuffering, - inputTypes: args.InputTypes, - outputTypes: args.OutputTypes, - inputArgsConverter: inputArgsConverter, - toClose: toClose, - maxOutputBatchMemSize: maxOutputBatchMemSize, - aggFnsAlloc: aggFnsAlloc, - hashAlloc: aggBucketAlloc{allocator: args.Allocator}, + OneInputNode: colexecop.NewOneInputNode(args.Input), + hashTableAllocator: hashTableAllocator, + spec: args.Spec, + state: hashAggregatorBuffering, + inputTypes: args.InputTypes, + outputTypes: args.OutputTypes, + inputArgsConverter: inputArgsConverter, + toClose: toClose, + aggFnsAlloc: aggFnsAlloc, + hashAlloc: aggBucketAlloc{allocator: args.Allocator}, } - hashAgg.accountingHelper.Init(outputUnlimitedAllocator, args.OutputTypes) + hashAgg.accountingHelper.Init(outputUnlimitedAllocator, maxOutputBatchMemSize, args.OutputTypes) hashAgg.bufferingState.tuples = colexecutils.NewAppendOnlyBufferedBatch(args.Allocator, args.InputTypes, nil /* colsToStore */) hashAgg.datumAlloc.AllocSize = hashAggregatorAllocSize hashAgg.aggHelper = newAggregatorHelper(args, &hashAgg.datumAlloc, true /* isHashAgg */, hashAggregatorMaxBuffered) diff --git a/pkg/sql/colexec/hash_aggregator_tmpl.go b/pkg/sql/colexec/hash_aggregator_tmpl.go index 02b44dab0976..a9f93b9d3da9 100644 --- a/pkg/sql/colexec/hash_aggregator_tmpl.go +++ b/pkg/sql/colexec/hash_aggregator_tmpl.go @@ -342,26 +342,21 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch { case hashAggregatorOutputting: // Note that ResetMaybeReallocate truncates the requested capacity // at coldata.BatchSize(), so we can just try asking for - // len(op.buckets) capacity. + // len(op.buckets)-op.curOutputBucketIdx (the number of remaining + // output tuples) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( - op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, - true, /* desiredCapacitySufficient */ + op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx, true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for batchDone := false; op.curOutputBucketIdx < len(op.buckets) && !batchDone; { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchDone = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx - } } if op.curOutputBucketIdx >= len(op.buckets) { if partialOrder { diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 041c58dd1c12..8d142c395eab 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -37,7 +37,6 @@ type OrderedSynchronizer struct { span *tracing.Span accountingHelper colmem.SetAccountingHelper - memoryLimit int64 inputs []colexecargs.OpWithMetaInfo ordering colinfo.ColumnOrdering typs []*types.T @@ -58,10 +57,6 @@ type OrderedSynchronizer struct { heap []int // comparators stores one comparator per ordering column. comparators []vecComparator - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int output coldata.Batch outVecs coldata.TypedVecs } @@ -91,13 +86,12 @@ func NewOrderedSynchronizer( ordering colinfo.ColumnOrdering, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ - memoryLimit: memoryLimit, inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), } - os.accountingHelper.Init(allocator, typs) + os.accountingHelper.Init(allocator, memoryLimit, typs) return os } @@ -117,7 +111,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) { + for batchDone := false; !batchDone; { if o.advanceMinBatch { // Advance the minimum input batch, fetching a new batch if // necessary. @@ -255,11 +249,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { o.advanceMinBatch = true // Account for the memory of the row we have just set. - o.accountingHelper.AccountForSet(outputIdx) + batchDone = o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit { - o.maxCapacity = outputIdx - } } o.output.SetLength(outputIdx) @@ -269,8 +260,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, - false, /* desiredCapacitySufficient */ + o.typs, o.output, 1 /* minDesiredCapacity */, false, /* desiredCapacitySufficient */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index c11e831eef24..5df6585b210d 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -60,7 +60,6 @@ type OrderedSynchronizer struct { span *tracing.Span accountingHelper colmem.SetAccountingHelper - memoryLimit int64 inputs []colexecargs.OpWithMetaInfo ordering colinfo.ColumnOrdering typs []*types.T @@ -81,10 +80,6 @@ type OrderedSynchronizer struct { heap []int // comparators stores one comparator per ordering column. comparators []vecComparator - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int output coldata.Batch outVecs coldata.TypedVecs } @@ -114,13 +109,12 @@ func NewOrderedSynchronizer( ordering colinfo.ColumnOrdering, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ - memoryLimit: memoryLimit, inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), } - os.accountingHelper.Init(allocator, typs) + os.accountingHelper.Init(allocator, memoryLimit, typs) return os } @@ -140,7 +134,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) { + for batchDone := false; !batchDone; { if o.advanceMinBatch { // Advance the minimum input batch, fetching a new batch if // necessary. @@ -205,11 +199,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { o.advanceMinBatch = true // Account for the memory of the row we have just set. - o.accountingHelper.AccountForSet(outputIdx) + batchDone = o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit { - o.maxCapacity = outputIdx - } } o.output.SetLength(outputIdx) @@ -219,8 +210,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, - false, /* desiredCapacitySufficient */ + o.typs, o.output, 1 /* minDesiredCapacity */, false, /* desiredCapacitySufficient */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index ead52a4b7688..8c60a44cb6c7 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -273,21 +273,12 @@ type cFetcher struct { scratch []byte accountingHelper colmem.SetAccountingHelper - - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when at the row finalization we realize that the output - // batch has exceeded the memory limit. - maxCapacity int } func (cf *cFetcher) resetBatch() { var reallocated bool var minDesiredCapacity int - if cf.maxCapacity > 0 { - // If we have already exceeded the memory limit for the output batch, we - // will only be using the same batch from now on. - minDesiredCapacity = cf.maxCapacity - } else if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) { + if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) { // If we have a limit hint, and either // 1) we don't have an estimate, or // 2) we have a soft limit, @@ -309,8 +300,7 @@ func (cf *cFetcher) resetBatch() { } } cf.machine.batch, reallocated = cf.accountingHelper.ResetMaybeReallocate( - cf.table.typs, cf.machine.batch, minDesiredCapacity, cf.memoryLimit, - false, /* desiredCapacitySufficient */ + cf.table.typs, cf.machine.batch, minDesiredCapacity, false, /* desiredCapacitySufficient */ ) if reallocated { cf.machine.colvecs.SetBatch(cf.machine.batch) @@ -465,7 +455,7 @@ func (cf *cFetcher) Init( cf.table = table cf.fetcher = kvFetcher - cf.accountingHelper.Init(allocator, cf.table.typs) + cf.accountingHelper.Init(allocator, cf.memoryLimit, cf.table.typs) return nil } @@ -854,23 +844,14 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { // column is requested) yet, but it is ok for the purposes of the // memory accounting - oids are fixed length values and, thus, have // already been accounted for when the batch was allocated. - cf.accountingHelper.AccountForSet(cf.machine.rowIdx) + emitBatch := cf.accountingHelper.AccountForSet(cf.machine.rowIdx) cf.machine.rowIdx++ cf.shiftState() - var emitBatch bool - if cf.maxCapacity == 0 && cf.accountingHelper.Allocator.Used() >= cf.memoryLimit { - cf.maxCapacity = cf.machine.rowIdx - } - if cf.machine.rowIdx >= cf.machine.batch.Capacity() || - (cf.maxCapacity > 0 && cf.machine.rowIdx >= cf.maxCapacity) || - (cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint) { - // We either - // 1. have no more room in our batch, so output it immediately - // or - // 2. we made it to our limit hint, so output our batch early - // to make sure that we don't bother filling in extra data - // if we don't need to. + if cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint { + // If we made it to our limit hint, so output our batch early to + // make sure that we don't bother filling in extra data if we + // don't need to. emitBatch = true // Update the limit hint to track the expected remaining rows to // be fetched. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 47738ef0c9c1..28e830ab7965 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -533,7 +533,16 @@ func GetFixedSizeTypeSize(t *types.T) (size int64) { // NOTE: it works under the assumption that only a single coldata.Batch is being // used. type SetAccountingHelper struct { - Allocator *Allocator + allocator *Allocator + + // curCapacity is the capacity of the last batch returned by + // ResetMaybeReallocate. + curCapacity int + // maxCapacity if non-zero indicates the target capacity of the batch. It is + // set once the batch exceeds the memory limit. + maxCapacity int + // memoryLimit determines the maximum memory footprint of the batch. + memoryLimit int64 // allFixedLength indicates that we're working with the type schema of only // fixed-length elements. @@ -574,9 +583,11 @@ type SetAccountingHelper struct { varLenDatumVecs []coldata.DatumVec } -// Init initializes the helper. -func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T) { - h.Allocator = allocator +// Init initializes the helper. The allocator must **not** be shared with any +// other component. +func (h *SetAccountingHelper) Init(allocator *Allocator, memoryLimit int64, typs []*types.T) { + h.allocator = allocator + h.memoryLimit = memoryLimit for vecIdx, typ := range typs { switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { @@ -607,15 +618,12 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { // Allocator.ResetMaybeReallocate (and thus has the same contract) with an // additional logic for memory tracking purposes. func (h *SetAccountingHelper) ResetMaybeReallocate( - typs []*types.T, - oldBatch coldata.Batch, - minCapacity int, - maxBatchMemSize int64, - desiredCapacitySufficient bool, + typs []*types.T, oldBatch coldata.Batch, minCapacity int, desiredCapacitySufficient bool, ) (newBatch coldata.Batch, reallocated bool) { - newBatch, reallocated = h.Allocator.ResetMaybeReallocate( - typs, oldBatch, minCapacity, maxBatchMemSize, desiredCapacitySufficient, + newBatch, reallocated = h.allocator.ResetMaybeReallocate( + typs, oldBatch, minCapacity, h.memoryLimit, desiredCapacitySufficient, ) + h.curCapacity = newBatch.Capacity() if reallocated && !h.allFixedLength { // Allocator.ResetMaybeReallocate has released the precise memory // footprint of the old batch and has accounted for the estimated @@ -665,17 +673,22 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( // AccountForSet updates the Allocator according to the new variable length // values in the row rowIdx in the batch that was returned by the last call to -// ResetMaybeReallocate. -func (h *SetAccountingHelper) AccountForSet(rowIdx int) { +// ResetMaybeReallocate. It returns a boolean indicating whether the batch is +// done (i.e. no more rows should be set on it before it is reset). +func (h *SetAccountingHelper) AccountForSet(rowIdx int) (batchDone bool) { + // The batch is done if we've just set the last row that the batch has the + // capacity for. + batchDone = h.curCapacity == rowIdx+1 if h.allFixedLength { // All vectors are of fixed-length and are already correctly accounted - // for. - return + // for. We also utilize the whole capacity since setting extra rows + // incurs no additional memory usage. + return batchDone } if len(h.bytesLikeVectors) > 0 { newBytesLikeTotalSize := h.getBytesLikeTotalSize() - h.Allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) + h.allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) h.prevBytesLikeTotalSize = newBytesLikeTotalSize } @@ -685,7 +698,7 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { d := decimalVec.Get(rowIdx) newDecimalSizes += int64(d.Size()) } - h.Allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) + h.allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) h.decimalSizes[rowIdx] = newDecimalSizes } @@ -697,8 +710,26 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { // was already included in EstimateBatchSizeBytes. newVarLengthDatumSize += int64(datumSize) } - h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize) + h.allocator.AdjustMemoryUsage(newVarLengthDatumSize) } + + if h.maxCapacity == 0 && h.allocator.Used() >= h.memoryLimit { + // This is the first time we exceeded the memory limit, so we memorize + // the capacity. + h.maxCapacity = rowIdx + 1 + } + if h.maxCapacity > 0 && h.maxCapacity == rowIdx+1 { + // The batch is also done if we've exceeded the memory limit, and we've + // just set the last row according to the memorized capacity. + batchDone = true + } + return batchDone +} + +// TestingUpdateMemoryLimit sets the new memory limit. It should only be used in +// tests. +func (h *SetAccountingHelper) TestingUpdateMemoryLimit(memoryLimit int64) { + h.memoryLimit = memoryLimit } // Release releases all of the resources so that they can be garbage collected. diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index ed819efd2c6c..ed2577fde417 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -288,7 +288,7 @@ func TestSetAccountingHelper(t *testing.T) { } var helper colmem.SetAccountingHelper - helper.Init(testAllocator, typs) + helper.Init(testAllocator, math.MaxInt64, typs) numIterations := rng.Intn(10) + 1 numRows := rng.Intn(coldata.BatchSize()) + 1 @@ -306,7 +306,8 @@ func TestSetAccountingHelper(t *testing.T) { // new batch with larger capacity might be allocated. maxBatchMemSize = largeMemSize } - batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize, false /* desiredCapacitySufficient */) + helper.TestingUpdateMemoryLimit(maxBatchMemSize) + batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, false /* desiredCapacitySufficient */) for rowIdx := 0; rowIdx < batch.Capacity(); rowIdx++ { for vecIdx, typ := range typs { @@ -322,7 +323,10 @@ func TestSetAccountingHelper(t *testing.T) { coldata.SetValueAt(batch.ColVec(vecIdx), converter(datum), rowIdx) } } - helper.AccountForSet(rowIdx) + // The purpose of this test is ensuring that memory accounting is + // up-to-date, so we ignore the recommendation of the helper whether + // the batch is done. + _ = helper.AccountForSet(rowIdx) } // At this point, we have set all rows in the batch and performed the