From 338670d53c4f5d1b8bf9b4207ed57c5905981adf Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 10 Feb 2022 21:33:35 -0800 Subject: [PATCH 1/3] colexec: deeply reset datum-backed vectors in ResetInternalBatch Previously, we would keep references to the old datums set in the datum-backed vectors until the datums are overwritten, thus, extending the time period when the no-longer-used datums are live unnecessarily. We don't have to do that for any other types because for others we can actually reuse the old space for new elements, so we want to keep them around (e.g. decimals can reuse the non-inlined coefficient). This commit makes it so that we deeply unset the datums in `ResetInternalBatch`. Care had to be taken to keep the memory accounting up-to-date. In particular, after deeply resetting the datum-backed vector, we want to release the memory allocation that was taken up by the actual datums while keeping the overhead of `tree.Datum` interface in (since `[]tree.Datum` slice is still fully live). Release note: None --- pkg/col/coldata/batch.go | 24 +++++- pkg/col/coldata/datum_vec.go | 5 +- pkg/col/coldataext/datum_vec.go | 51 ++++++----- pkg/sql/colexec/colexecutils/utils.go | 4 +- pkg/sql/colmem/allocator.go | 120 +++++++++++++------------- pkg/sql/colmem/allocator_test.go | 45 ++++++++++ 6 files changed, 160 insertions(+), 89 deletions(-) diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index 01df90b9329f..dd517fbe62c2 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -16,6 +16,7 @@ import ( "strings" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" @@ -68,8 +69,9 @@ type Batch interface { // important for callers to call ResetInternalBatch if they own internal // batches that they reuse as not doing this could result in correctness // or memory blowup issues. It unsets the selection and sets the length to - // 0. - ResetInternalBatch() + // 0. Notably, it deeply resets the datum-backed vectors and returns the + // number of bytes released as a result of the reset. + ResetInternalBatch() int64 // String returns a pretty representation of this batch. String() string } @@ -126,6 +128,8 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor b.b[i] = col if col.IsBytesLike() { b.bytesVecIdxs.Add(i) + } else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + b.datumVecIdxs.Add(i) } } return b @@ -202,6 +206,8 @@ type MemBatch struct { // vectors and checking whether they are of Bytes type we store this slice // separately. bytesVecIdxs util.FastIntSet + // datumVecIdxs stores the indices of all datum-backed vectors in b. + datumVecIdxs util.FastIntSet useSel bool // sel is - if useSel is true - a selection vector from upstream. A // selection vector is a list of selected tuple indices in this memBatch's @@ -271,6 +277,8 @@ func (m *MemBatch) SetLength(length int) { func (m *MemBatch) AppendCol(col Vec) { if col.IsBytesLike() { m.bytesVecIdxs.Add(len(m.b)) + } else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + m.datumVecIdxs.Add(len(m.b)) } m.b = append(m.b, col) } @@ -315,12 +323,17 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) { m.bytesVecIdxs.Remove(i) } } + for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { + if i >= len(typs) { + m.datumVecIdxs.Remove(i) + } + } m.ResetInternalBatch() m.SetLength(length) } // ResetInternalBatch implements the Batch interface. -func (m *MemBatch) ResetInternalBatch() { +func (m *MemBatch) ResetInternalBatch() int64 { m.SetLength(0 /* length */) m.SetSelection(false) for _, v := range m.b { @@ -331,6 +344,11 @@ func (m *MemBatch) ResetInternalBatch() { for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) { Reset(m.b[i]) } + var released int64 + for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { + released += m.b[i].Datum().Reset() + } + return released } // String returns a pretty representation of this batch. diff --git a/pkg/col/coldata/datum_vec.go b/pkg/col/coldata/datum_vec.go index 639b26656ed4..718c7d41806a 100644 --- a/pkg/col/coldata/datum_vec.go +++ b/pkg/col/coldata/datum_vec.go @@ -36,8 +36,6 @@ type DatumVec interface { AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int) // AppendVal appends the given tree.Datum value to the end of the vector. AppendVal(v Datum) - // SetLength sets the length of the vector. - SetLength(l int) // Len returns the length of the vector. Len() int // Cap returns the underlying capacity of the vector. @@ -55,4 +53,7 @@ type DatumVec interface { // be used when elements before startIdx are guaranteed not to have been // modified. Size(startIdx int) int64 + // Reset resets the vector for reuse. It returns the number of bytes + // released. + Reset() int64 } diff --git a/pkg/col/coldataext/datum_vec.go b/pkg/col/coldataext/datum_vec.go index aaca8a057236..6bfdf6271384 100644 --- a/pkg/col/coldataext/datum_vec.go +++ b/pkg/col/coldataext/datum_vec.go @@ -115,11 +115,6 @@ func (dv *datumVec) AppendVal(v coldata.Datum) { dv.data = append(dv.data, datum) } -// SetLength implements coldata.DatumVec interface. -func (dv *datumVec) SetLength(l int) { - dv.data = dv.data[:l] -} - // Len implements coldata.DatumVec interface. func (dv *datumVec) Len() int { return len(dv.data) @@ -145,6 +140,24 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error { return err } +// valuesSize returns the footprint of actual datums (in bytes) with ordinals in +// [startIdx:] range, ignoring the overhead of tree.Datum wrapper. +func (dv *datumVec) valuesSize(startIdx int) int64 { + var size int64 + // Only the elements up to the length are expected to be non-nil. Note that + // we cannot take a short-cut with fixed-length values here because they + // might not be set, so we could over-account if we did something like + // size += (len-startIdx) * fixedSize. + if startIdx < dv.Len() { + for _, d := range dv.data[startIdx:dv.Len()] { + if d != nil { + size += int64(d.Size()) + } + } + } + return size +} + // Size implements coldata.DatumVec interface. func (dv *datumVec) Size(startIdx int) int64 { // Note that we don't account for the overhead of datumVec struct, and the @@ -156,24 +169,18 @@ func (dv *datumVec) Size(startIdx int) int64 { if startIdx < 0 { startIdx = 0 } - count := int64(dv.Cap() - startIdx) - size := memsize.DatumOverhead * count - if datumSize, variable := tree.DatumTypeSize(dv.t); variable { - // The elements in dv.data[max(startIdx,len):cap] range are accounted with - // the default datum size for the type. For those in the range - // [startIdx, len) we call Datum.Size(). - idx := startIdx - for ; idx < len(dv.data); idx++ { - if dv.data[idx] != nil { - size += int64(dv.data[idx].Size()) - } - } - // Pick up where the loop left off. - size += int64(dv.Cap()-idx) * int64(datumSize) - } else { - size += int64(datumSize) * count + // We have to account for the tree.Datum overhead for the whole capacity of + // the underlying slice. + return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx) +} + +// Reset implements coldata.DatumVec interface. +func (dv *datumVec) Reset() int64 { + released := dv.valuesSize(0 /* startIdx */) + for i := range dv.data { + dv.data[i] = nil } - return size + return released } // assertValidDatum asserts that the given datum is valid to be stored in this diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index d1415d2f348c..8abc03e66bef 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -183,9 +183,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory) } // ResetInternalBatch implements the coldata.Batch interface. -func (b *AppendOnlyBufferedBatch) ResetInternalBatch() { +func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 { b.SetLength(0 /* n */) - b.batch.ResetInternalBatch() + return b.batch.ResetInternalBatch() } // String implements the coldata.Batch interface. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 8af904f12ec3..567b9d6b0639 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -189,7 +189,7 @@ func (a *Allocator) ResetMaybeReallocate( } if useOldBatch { reallocated = false - oldBatch.ResetInternalBatch() + a.ReleaseMemory(oldBatch.ResetInternalBatch()) newBatch = oldBatch } else { a.ReleaseMemory(oldBatchMemSize) @@ -266,6 +266,8 @@ func (a *Allocator) MaybeAppendColumn(b coldata.Batch, t *types.T, colIdx int) { // Flat bytes vector needs to be reset before the vector can be // reused. coldata.Reset(presentVec) + } else if presentVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + a.ReleaseMemory(presentVec.Datum().Reset()) } return } @@ -428,15 +430,11 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 { case types.JsonFamily: numBytesVectors++ case typeconv.DatumVecCanonicalTypeFamily: - // In datum vec we need to account for memory underlying the struct - // that is the implementation of tree.Datum interface (for example, - // tree.DBoolFalse) as well as for the overhead of storing that - // implementation in the slice of tree.Datums. Note that if t is of - // variable size, the memory will be properly accounted in - // getVecMemoryFootprint. - // Note: keep the calculation here in line with datumVec.Size. - implementationSize, _ := tree.DatumTypeSize(t) - acc += int64(implementationSize) + memsize.DatumOverhead + // Initially, only []tree.Datum slice is allocated for the + // datum-backed vectors right away, so that's what we're including + // in the estimate. Later on, once the actual values are set, they + // will be accounted for properly. + acc += memsize.DatumOverhead case types.BoolFamily, types.IntFamily, @@ -523,57 +521,51 @@ type SetAccountingHelper struct { // that we have already accounted for. prevBytesLikeTotalSize int64 - // varSizeVecIdxs stores the indices of all vectors with variable sized - // values except for the bytes-like ones. - varSizeVecIdxs util.FastIntSet - // decimalVecs and datumVecs store all decimal and datum-backed vectors, - // respectively. They are updated every time a new batch is allocated. + // decimalVecIdxs stores the indices of all decimal vectors. + decimalVecIdxs util.FastIntSet + // decimalVecs stores all decimal vectors. They are updated every time a new + // batch is allocated. decimalVecs []coldata.Decimals - datumVecs []coldata.DatumVec - // varSizeDatumSizes stores the amount of space we have accounted for for - // the corresponding "row" of variable length values in the last batch that - // the helper has touched. This is necessary to track because when the batch - // is reset, the vectors still have references to the old datums, so we need - // to adjust the accounting only by the delta. Similarly, once a new batch - // is allocated, we need to track the estimate that we have already + // decimalSizes stores the amount of space we have accounted for for the + // corresponding decimal values in the corresponding row of the last batch + // that the helper has touched. This is necessary to track because when the + // batch is reset, the vectors still have references to the old decimals, so + // we need to adjust the accounting only by the delta. Similarly, once a new + // batch is allocated, we need to track the estimate that we have already // accounted for. // // Note that because ResetMaybeReallocate caps the capacity of the batch at // coldata.BatchSize(), this slice will never exceed coldata.BatchSize() in // size, and we choose to ignore it for the purposes of memory accounting. - varSizeDatumSizes []int64 - // varSizeEstimatePerRow is the total estimated size of single values from - // varSizeVecIdxs vectors which is accounted for by EstimateBatchSizeBytes. - // It serves as the initial value for varSizeDatumSizes values. - varSizeEstimatePerRow int64 + decimalSizes []int64 + + // varLenDatumVecIdxs stores the indices of all datum-backed vectors with + // variable-length values. + varLenDatumVecIdxs util.FastIntSet + // varLenDatumVecs stores all variable-sized datum-backed vectors. They are + // updated every time a new batch is allocated. + varLenDatumVecs []coldata.DatumVec } // Init initializes the helper. func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T) { h.Allocator = allocator - numDecimalVecs := 0 for vecIdx, typ := range typs { switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { case types.BytesFamily, types.JsonFamily: h.bytesLikeVecIdxs.Add(vecIdx) case types.DecimalFamily: - h.varSizeVecIdxs.Add(vecIdx) - h.varSizeEstimatePerRow += memsize.Decimal - numDecimalVecs++ + h.decimalVecIdxs.Add(vecIdx) case typeconv.DatumVecCanonicalTypeFamily: - estimate, isVarlen := tree.DatumTypeSize(typ) - if isVarlen { - h.varSizeVecIdxs.Add(vecIdx) - h.varSizeEstimatePerRow += int64(estimate) + memsize.DatumOverhead - } + h.varLenDatumVecIdxs.Add(vecIdx) } } - h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() + h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.decimalVecIdxs.Empty() && h.varLenDatumVecIdxs.Empty() h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len()) - h.decimalVecs = make([]coldata.Decimals, numDecimalVecs) - h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs) + h.decimalVecs = make([]coldata.Decimals, h.decimalVecIdxs.Len()) + h.varLenDatumVecs = make([]coldata.DatumVec, h.varLenDatumVecIdxs.Len()) } func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { @@ -616,23 +608,24 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( } h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize() } - if !h.varSizeVecIdxs.Empty() { + if !h.decimalVecIdxs.Empty() { h.decimalVecs = h.decimalVecs[:0] - h.datumVecs = h.datumVecs[:0] - for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) { - if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily { - h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) - } else { - h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum()) - } + for vecIdx, ok := h.decimalVecIdxs.Next(0); ok; vecIdx, ok = h.decimalVecIdxs.Next(vecIdx + 1) { + h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) } - if cap(h.varSizeDatumSizes) < newBatch.Capacity() { - h.varSizeDatumSizes = make([]int64, newBatch.Capacity()) - } else { - h.varSizeDatumSizes = h.varSizeDatumSizes[:newBatch.Capacity()] + h.decimalSizes = make([]int64, newBatch.Capacity()) + for i := range h.decimalSizes { + // In EstimateBatchSizeBytes, memsize.Decimal has already been + // accounted for for each decimal value, so we multiple that by + // the number of decimal vectors to get already included + // footprint of all decimal values in a single row. + h.decimalSizes[i] = int64(len(h.decimalVecs)) * memsize.Decimal } - for i := range h.varSizeDatumSizes { - h.varSizeDatumSizes[i] = h.varSizeEstimatePerRow + } + if !h.varLenDatumVecIdxs.Empty() { + h.varLenDatumVecs = h.varLenDatumVecs[:0] + for vecIdx, ok := h.varLenDatumVecIdxs.Next(0); ok; vecIdx, ok = h.varLenDatumVecIdxs.Next(vecIdx + 1) { + h.varLenDatumVecs = append(h.varLenDatumVecs, vecs[vecIdx].Datum()) } } } @@ -655,18 +648,25 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { h.prevBytesLikeTotalSize = newBytesLikeTotalSize } - if !h.varSizeVecIdxs.Empty() { - var newVarLengthDatumSize int64 + if !h.decimalVecIdxs.Empty() { + var newDecimalSizes int64 for _, decimalVec := range h.decimalVecs { d := decimalVec.Get(rowIdx) - newVarLengthDatumSize += int64(d.Size()) + newDecimalSizes += int64(d.Size()) } - for _, datumVec := range h.datumVecs { + h.Allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) + h.decimalSizes[rowIdx] = newDecimalSizes + } + + if !h.varLenDatumVecIdxs.Empty() { + var newVarLengthDatumSize int64 + for _, datumVec := range h.varLenDatumVecs { datumSize := datumVec.Get(rowIdx).(tree.Datum).Size() - newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead + // Note that we're ignoring the overhead of tree.Datum because it + // was already included in EstimateBatchSizeBytes. + newVarLengthDatumSize += int64(datumSize) } - h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx]) - h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize + h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize) } } diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index 874e0ff3e44f..27ec3d57842b 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -35,6 +35,10 @@ import ( "github.com/stretchr/testify/require" ) +func init() { + randutil.SeedForTests() +} + func TestMaybeAppendColumn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -330,3 +334,44 @@ func TestSetAccountingHelper(t *testing.T) { } } } + +// TestEstimateBatchSizeBytes verifies that EstimateBatchSizeBytes returns such +// an estimate that it equals the actual footprint of the newly-created batch +// with no values set. +func TestEstimateBatchSizeBytes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + rng, _ := randutil.NewTestRand() + st := cluster.MakeTestingClusterSettings() + testMemMonitor := execinfra.NewTestMemMonitor(ctx, st) + defer testMemMonitor.Stop(ctx) + memAcc := testMemMonitor.MakeBoundAccount() + defer memAcc.Close(ctx) + evalCtx := tree.MakeTestingEvalContext(st) + testColumnFactory := coldataext.NewExtendedColumnFactory(&evalCtx) + testAllocator := colmem.NewAllocator(ctx, &memAcc, testColumnFactory) + + numCols := rng.Intn(10) + 1 + typs := make([]*types.T, numCols) + for i := range typs { + typs[i] = randgen.RandType(rng) + } + const numRuns = 10 + for run := 0; run < numRuns; run++ { + memAcc.Clear(ctx) + numRows := rng.Intn(coldata.BatchSize()) + 1 + batch := testAllocator.NewMemBatchWithFixedCapacity(typs, numRows) + expected := memAcc.Used() + actual := colmem.GetBatchMemSize(batch) + if expected != actual { + fmt.Printf("run = %d numRows = %d\n", run, numRows) + for i := range typs { + fmt.Printf("%s ", typs[i].SQLString()) + } + fmt.Println() + t.Fatal(errors.Newf("expected %d, actual %d", expected, actual)) + } + } +} From 4fe0096055f28b7dbb52db2b0c7424bab99835e7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 10 Feb 2022 19:47:18 -0800 Subject: [PATCH 2/3] colexec: fix limiting the output batch in size in several operators In the cFetcher, the hash aggregator, and the ordered synchronizer we are performing careful memory accounting in order to limit the size of the output batches. However, we had an incorrect assumption that could lead to batches being larger than desired. Previously, the first time the batch exceeded the target size, we would remember its "capacity" (i.e. the number of rows that fit into that batch), and in the future we would always put up to that number of rows in the batch. That works well when each row is of roughly the same size throughout the lifetime of an operator; however, that is not always the case. Imagine we have 1000 small rows followed by 1000 large rows - previously, all 1000 large rows would be put into a single batch, significantly exceeding the target size. This commit makes the limiting much more sane - it removes the notion of "max capacity" and, instead, after each row is set in the batch we check whether the batch has exceeded the target size. Release note: None --- pkg/sql/colexec/hash_aggregator.eg.go | 16 ++++++---------- pkg/sql/colexec/hash_aggregator.go | 6 +----- pkg/sql/colexec/hash_aggregator_tmpl.go | 8 +++----- pkg/sql/colexec/ordered_synchronizer.eg.go | 10 +++------- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 10 +++------- pkg/sql/colfetcher/cfetcher.go | 16 ++-------------- 6 files changed, 18 insertions(+), 48 deletions(-) diff --git a/pkg/sql/colexec/hash_aggregator.eg.go b/pkg/sql/colexec/hash_aggregator.eg.go index 97c8298afa1f..7dc4e4808e28 100644 --- a/pkg/sql/colexec/hash_aggregator.eg.go +++ b/pkg/sql/colexec/hash_aggregator.eg.go @@ -459,9 +459,7 @@ func getNext_true(op *hashAggregator) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) @@ -470,8 +468,8 @@ func getNext_true(op *hashAggregator) coldata.Batch { op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx + if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { + break } } if op.curOutputBucketIdx >= len(op.buckets) { @@ -606,9 +604,7 @@ func getNext_false(op *hashAggregator) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) @@ -617,8 +613,8 @@ func getNext_false(op *hashAggregator) coldata.Batch { op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx + if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { + break } } if op.curOutputBucketIdx >= len(op.buckets) { diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 6ac8e6c1dc6c..0a9c58ade7d3 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -141,11 +141,7 @@ type hashAggregator struct { curOutputBucketIdx int maxOutputBatchMemSize int64 - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int - output coldata.Batch + output coldata.Batch aggFnsAlloc *colexecagg.AggregateFuncsAlloc hashAlloc aggBucketAlloc diff --git a/pkg/sql/colexec/hash_aggregator_tmpl.go b/pkg/sql/colexec/hash_aggregator_tmpl.go index 2661250ec0ab..c9b3a660aee5 100644 --- a/pkg/sql/colexec/hash_aggregator_tmpl.go +++ b/pkg/sql/colexec/hash_aggregator_tmpl.go @@ -345,9 +345,7 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && - op.curOutputBucketIdx < len(op.buckets) && - (op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) { + for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) @@ -356,8 +354,8 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch { op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - op.maxCapacity = curOutputIdx + if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { + break } } if op.curOutputBucketIdx >= len(op.buckets) { diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 3a8f0f0ab65b..39271f192059 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -53,10 +53,6 @@ type OrderedSynchronizer struct { heap []int // comparators stores one comparator per ordering column. comparators []vecComparator - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int output coldata.Batch outVecs coldata.TypedVecs } @@ -112,7 +108,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) { + for outputIdx < o.output.Capacity() { if o.Len() == 0 { // All inputs exhausted. break @@ -245,8 +241,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { // Account for the memory of the row we have just set. o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit { - o.maxCapacity = outputIdx + if o.accountingHelper.Allocator.Used() >= o.memoryLimit { + break } } diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index e1baf9bb4e3c..0e2f1165c868 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -76,10 +76,6 @@ type OrderedSynchronizer struct { heap []int // comparators stores one comparator per ordering column. comparators []vecComparator - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when, after setting a row, we realize that the output - // batch has exceeded the memory limit. - maxCapacity int output coldata.Batch outVecs coldata.TypedVecs } @@ -135,7 +131,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) { + for outputIdx < o.output.Capacity() { if o.Len() == 0 { // All inputs exhausted. break @@ -189,8 +185,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { // Account for the memory of the row we have just set. o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit { - o.maxCapacity = outputIdx + if o.accountingHelper.Allocator.Used() >= o.memoryLimit { + break } } diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index e42dd1999ced..e05a423442e9 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -287,21 +287,12 @@ type cFetcher struct { // kvFetcherMemAcc is a memory account that will be used by the underlying // KV fetcher. kvFetcherMemAcc *mon.BoundAccount - - // maxCapacity if non-zero indicates the target capacity of the output - // batch. It is set when at the row finalization we realize that the output - // batch has exceeded the memory limit. - maxCapacity int } func (cf *cFetcher) resetBatch() { var reallocated bool var minDesiredCapacity int - if cf.maxCapacity > 0 { - // If we have already exceeded the memory limit for the output batch, we - // will only be using the same batch from now on. - minDesiredCapacity = cf.maxCapacity - } else if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) { + if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) { // If we have a limit hint, and either // 1) we don't have an estimate, or // 2) we have a soft limit, @@ -917,11 +908,8 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { cf.shiftState() var emitBatch bool - if cf.maxCapacity == 0 && cf.accountingHelper.Allocator.Used() >= cf.memoryLimit { - cf.maxCapacity = cf.machine.rowIdx - } if cf.machine.rowIdx >= cf.machine.batch.Capacity() || - (cf.maxCapacity > 0 && cf.machine.rowIdx >= cf.maxCapacity) || + (cf.accountingHelper.Allocator.Used() >= cf.memoryLimit) || (cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint) { // We either // 1. have no more room in our batch, so output it immediately From 950ba01aa99bbdd25351e9fe94b64204e9618c83 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 11 Feb 2022 19:01:18 -0800 Subject: [PATCH 3/3] WIP on fixing the previous commit The problem in the previous commit is that if bytes-like and/or decimal vectors are present, and once the footprint of the batch exceeds the target size because of bytes-like/decimal vectors, then after `ResetMaybeReallocate` call `AccountForSet` will only allow a single row. The problem is that we're holding onto old bytes/decimal values in order to reuse their space, but then our heuristic of no longer allowing more Sets once the reported usage exceeds the target size doesn't work. One idea is to not account for the bytes-like vectors in `ResetMaybeReallocate` and then use proportional size. The problem with this is that we're obviously not accounting for possibly large allocations (for some time). Another idea is to only apply the heuristic of making the batch full if the reported memory usage exceeds the target size IFF there was an increase in memory usage. This works well for datum-backed vectors. For bytes-like/decimals, however, we could have a pathological behavior where the sizes of the batches keep on growing until `coldata.BatchSize()` capacity is reached. --- pkg/sql/colexec/hash_aggregator.eg.go | 16 +-- pkg/sql/colexec/hash_aggregator_tmpl.go | 8 +- pkg/sql/colexec/ordered_synchronizer.eg.go | 7 +- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 7 +- pkg/sql/colfetcher/cfetcher.go | 16 +-- pkg/sql/colmem/allocator.go | 45 +++++-- pkg/sql/colmem/allocator_test.go | 128 ++++++++++++++++++- 7 files changed, 173 insertions(+), 54 deletions(-) diff --git a/pkg/sql/colexec/hash_aggregator.eg.go b/pkg/sql/colexec/hash_aggregator.eg.go index 7dc4e4808e28..00692e0cd2ae 100644 --- a/pkg/sql/colexec/hash_aggregator.eg.go +++ b/pkg/sql/colexec/hash_aggregator.eg.go @@ -459,18 +459,16 @@ func getNext_true(op *hashAggregator) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { + var batchFull bool + for op.curOutputBucketIdx < len(op.buckets) && !batchFull { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchFull = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - break - } } if op.curOutputBucketIdx >= len(op.buckets) { if l := op.bufferingState.pendingBatch.Length(); l > 0 { @@ -604,18 +602,16 @@ func getNext_false(op *hashAggregator) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { + var batchFull bool + for op.curOutputBucketIdx < len(op.buckets) && !batchFull { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchFull = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - break - } } if op.curOutputBucketIdx >= len(op.buckets) { op.state = hashAggregatorDone diff --git a/pkg/sql/colexec/hash_aggregator_tmpl.go b/pkg/sql/colexec/hash_aggregator_tmpl.go index c9b3a660aee5..ab2eed2afeb0 100644 --- a/pkg/sql/colexec/hash_aggregator_tmpl.go +++ b/pkg/sql/colexec/hash_aggregator_tmpl.go @@ -345,18 +345,16 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch { op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, ) curOutputIdx := 0 - for curOutputIdx < op.output.Capacity() && op.curOutputBucketIdx < len(op.buckets) { + var batchFull bool + for op.curOutputBucketIdx < len(op.buckets) && !batchFull { bucket := op.buckets[op.curOutputBucketIdx] for fnIdx, fn := range bucket.fns { fn.SetOutput(op.output.ColVec(fnIdx)) fn.Flush(curOutputIdx) } - op.accountingHelper.AccountForSet(curOutputIdx) + batchFull = op.accountingHelper.AccountForSet(curOutputIdx) curOutputIdx++ op.curOutputBucketIdx++ - if op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize { - break - } } if op.curOutputBucketIdx >= len(op.buckets) { if partialOrder { diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 39271f192059..09f887db42f3 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -108,7 +108,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() { + for batchFull := false; !batchFull; { if o.Len() == 0 { // All inputs exhausted. break @@ -239,11 +239,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } // Account for the memory of the row we have just set. - o.accountingHelper.AccountForSet(outputIdx) + batchFull = o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.accountingHelper.Allocator.Used() >= o.memoryLimit { - break - } } o.output.SetLength(outputIdx) diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 0e2f1165c868..91f1444acca0 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -131,7 +131,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.resetOutput() outputIdx := 0 - for outputIdx < o.output.Capacity() { + for batchFull := false; !batchFull; { if o.Len() == 0 { // All inputs exhausted. break @@ -183,11 +183,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } // Account for the memory of the row we have just set. - o.accountingHelper.AccountForSet(outputIdx) + batchFull = o.accountingHelper.AccountForSet(outputIdx) outputIdx++ - if o.accountingHelper.Allocator.Used() >= o.memoryLimit { - break - } } o.output.SetLength(outputIdx) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index e05a423442e9..f4ee62bb1e1a 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -903,20 +903,14 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { // column is requested) yet, but it is ok for the purposes of the // memory accounting - oids are fixed length values and, thus, have // already been accounted for when the batch was allocated. - cf.accountingHelper.AccountForSet(cf.machine.rowIdx) + emitBatch := cf.accountingHelper.AccountForSet(cf.machine.rowIdx) cf.machine.rowIdx++ cf.shiftState() - var emitBatch bool - if cf.machine.rowIdx >= cf.machine.batch.Capacity() || - (cf.accountingHelper.Allocator.Used() >= cf.memoryLimit) || - (cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint) { - // We either - // 1. have no more room in our batch, so output it immediately - // or - // 2. we made it to our limit hint, so output our batch early - // to make sure that we don't bother filling in extra data - // if we don't need to. + if cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint { + // If we made it to our limit hint, so output our batch early to + // make sure that we don't bother filling in extra data if we + // don't need to. emitBatch = true // Update the limit hint to track the expected remaining rows to // be fetched. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 567b9d6b0639..4b8241065e33 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -506,7 +506,12 @@ func GetFixedSizeTypeSize(t *types.T) (size int64) { // NOTE: it works under the assumption that only a single coldata.Batch is being // used. type SetAccountingHelper struct { - Allocator *Allocator + allocator *Allocator + + batchState struct { + maxMemSize int64 + capacity int + } // allFixedLength indicates that we're working with the type schema of only // fixed-length elements. @@ -549,7 +554,7 @@ type SetAccountingHelper struct { // Init initializes the helper. func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T) { - h.Allocator = allocator + h.allocator = allocator for vecIdx, typ := range typs { switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { @@ -580,11 +585,13 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { // Allocator.ResetMaybeReallocate (and thus has the same contract) with an // additional logic for memory tracking purposes. func (h *SetAccountingHelper) ResetMaybeReallocate( - typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64, + typs []*types.T, oldBatch coldata.Batch, minDesiredCapacity int, maxBatchMemSize int64, ) (newBatch coldata.Batch, reallocated bool) { - newBatch, reallocated = h.Allocator.ResetMaybeReallocate( - typs, oldBatch, minCapacity, maxBatchMemSize, + newBatch, reallocated = h.allocator.ResetMaybeReallocate( + typs, oldBatch, minDesiredCapacity, maxBatchMemSize, ) + h.batchState.maxMemSize = maxBatchMemSize + h.batchState.capacity = newBatch.Capacity() if reallocated && !h.allFixedLength { // Allocator.ResetMaybeReallocate has released the precise memory // footprint of the old batch and has accounted for the estimated @@ -634,17 +641,27 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( // AccountForSet updates the Allocator according to the new variable length // values in the row rowIdx in the batch that was returned by the last call to -// ResetMaybeReallocate. -func (h *SetAccountingHelper) AccountForSet(rowIdx int) { +// ResetMaybeReallocate. Returns true when the batch is "full" (i.e. it has been +// filled up to its capacity and/or memory size). +func (h *SetAccountingHelper) AccountForSet(rowIdx int) bool { + if rowIdx+1 == h.batchState.capacity { + // The batch has been filled up to capacity. + return true + } + if h.allFixedLength { // All vectors are of fixed-length and are already correctly accounted - // for. - return + // for, and there is more capacity in the batch. + return false } + // TODO: comment. + var hadIncrease bool + if len(h.bytesLikeVectors) > 0 { newBytesLikeTotalSize := h.getBytesLikeTotalSize() - h.Allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) + hadIncrease = newBytesLikeTotalSize > h.prevBytesLikeTotalSize + h.allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) h.prevBytesLikeTotalSize = newBytesLikeTotalSize } @@ -654,7 +671,8 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { d := decimalVec.Get(rowIdx) newDecimalSizes += int64(d.Size()) } - h.Allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) + hadIncrease = hadIncrease || newDecimalSizes > h.decimalSizes[rowIdx] + h.allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) h.decimalSizes[rowIdx] = newDecimalSizes } @@ -666,8 +684,11 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { // was already included in EstimateBatchSizeBytes. newVarLengthDatumSize += int64(datumSize) } - h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize) + h.allocator.AdjustMemoryUsage(newVarLengthDatumSize) + hadIncrease = true } + + return hadIncrease && h.allocator.Used() >= h.batchState.maxMemSize } // Release releases all of the resources so that they can be garbage collected. diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index 27ec3d57842b..e48e4915f3a0 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -15,6 +15,7 @@ import ( "fmt" "math" "testing" + "unsafe" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" @@ -24,10 +25,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/memsize" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/bitarray" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -279,6 +282,8 @@ func TestSetAccountingHelper(t *testing.T) { for i := range typs { typs[i] = randgen.RandType(rng) } + // For Bytes, insert pretty large values. + bytesValue := make([]byte, 8*coldata.BytesInitialAllocationFactor) var helper colmem.SetAccountingHelper helper.Init(testAllocator, typs) @@ -301,22 +306,24 @@ func TestSetAccountingHelper(t *testing.T) { } batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize) - for rowIdx := 0; rowIdx < batch.Capacity(); rowIdx++ { + var rowIdx int + for batchFull := false; !batchFull; { for vecIdx, typ := range typs { switch typ.Family() { case types.BytesFamily: - // For Bytes, insert pretty large values. - v := make([]byte, rng.Intn(8*coldata.BytesInitialAllocationFactor)) - _, _ = rng.Read(v) - batch.ColVec(vecIdx).Bytes().Set(rowIdx, v) + bytesValue = bytesValue[:rng.Intn(cap(bytesValue))] + _, _ = rng.Read(bytesValue) + batch.ColVec(vecIdx).Bytes().Set(rowIdx, bytesValue) default: datum := randgen.RandDatum(rng, typ, false /* nullOk */) converter := colconv.GetDatumToPhysicalFn(typ) coldata.SetValueAt(batch.ColVec(vecIdx), converter(datum), rowIdx) } } - helper.AccountForSet(rowIdx) + batchFull = helper.AccountForSet(rowIdx) + rowIdx++ } + batch.SetLength(rowIdx) // At this point, we have set all rows in the batch and performed the // memory accounting for each set. We no longer have any uninitialized @@ -375,3 +382,112 @@ func TestEstimateBatchSizeBytes(t *testing.T) { } } } + +const bitArraySize = int(unsafe.Sizeof(bitarray.BitArray{})) + +func TestAccountForSetDynamicLimiting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.NoError(t, coldata.SetBatchSizeForTests(10)) + + ctx := context.Background() + rng, _ := randutil.NewTestRand() + st := cluster.MakeTestingClusterSettings() + testMemMonitor := execinfra.NewTestMemMonitor(ctx, st) + defer testMemMonitor.Stop(ctx) + memAcc := testMemMonitor.MakeBoundAccount() + defer memAcc.Close(ctx) + evalCtx := tree.MakeTestingEvalContext(st) + testColumnFactory := coldataext.NewExtendedColumnFactory(&evalCtx) + testAllocator := colmem.NewAllocator(ctx, &memAcc, testColumnFactory) + + const maxValueSize = 128 + bytesValue := make([]byte, maxValueSize) + + // TODO + //numCols := 1 + //typ := types.VarBit + typ := types.Bytes + typs := []*types.T{typ} + + var helper colmem.SetAccountingHelper + helper.Init(testAllocator, typs) + + var batch coldata.Batch + for iteration := 0; iteration < 10; iteration++ { + reallocateBatch := true + batchCapacity := rng.Intn(coldata.BatchSize()) + 1 + if batch != nil { + // If the batch is already at maximum capacity, it won't get + // reallocated. + if batch.Capacity() == coldata.BatchSize() { + reallocateBatch = false + batchCapacity = coldata.BatchSize() + } else { + batchCapacity = batch.Capacity() * 2 + if batchCapacity > coldata.BatchSize() { + batchCapacity = coldata.BatchSize() + } + } + } + maxBatchMemSize := int64(math.MaxInt64) + expectedLength := batchCapacity + valueSizeBytes := (rng.Intn(maxValueSize/8) + 1) * 8 + if typ.Family() == types.BitFamily && valueSizeBytes < bitArraySize+8 { + valueSizeBytes = bitArraySize + 8 + } + // With 50% chance use a small batch mem size so that the batch is + // filled partially. + if rng.Float64() < 0.5 { + expectedLength = int(float64(batchCapacity) * rng.Float64()) + if expectedLength == 0 { + expectedLength = 1 + } + maxBatchMemSize = int64(valueSizeBytes * expectedLength) + // Account for the selection vector as well as the fixed costs of + // the data vector. + maxBatchMemSize += int64(batchCapacity)*memsize.Int + colmem.EstimateBatchSizeBytes(typs, batchCapacity) + // If we use the limit that is smaller than the footprint of the + // last batch, the batch won't be reallocated. + if memAcc.Used() >= maxBatchMemSize { + reallocateBatch = false + capacityDiff := batchCapacity - batch.Capacity() + maxBatchMemSize -= int64(capacityDiff)*memsize.Int + colmem.EstimateBatchSizeBytes(typs, capacityDiff) + batchCapacity = batch.Capacity() + // If now the capacity of the batch is smaller than the number + // of rows we want to insert, we have to curb our desires too. + if expectedLength > batchCapacity { + maxBatchMemSize -= int64(valueSizeBytes * (expectedLength - batchCapacity)) + expectedLength = batchCapacity + } + } + } + var reallocated bool + batch, reallocated = helper.ResetMaybeReallocate(typs, batch, batchCapacity, maxBatchMemSize) + require.Equal(t, reallocateBatch, reallocated) + + var rowIdx int + for batchFull := false; !batchFull && rowIdx < expectedLength; { + switch typ.Family() { + case types.BitFamily: + bitLen := uint(8 * (valueSizeBytes - bitArraySize)) + datum := &tree.DBitArray{BitArray: bitarray.Rand(rng, bitLen)} + batch.ColVec(0).Datum().Set(rowIdx, datum) + case types.BytesFamily: + bytesValue = bytesValue[:valueSizeBytes] + _, _ = rng.Read(bytesValue) + batch.ColVec(0).Bytes().Set(rowIdx, bytesValue) + default: + datum := randgen.RandDatum(rng, typ, false /* nullOk */) + converter := colconv.GetDatumToPhysicalFn(typ) + coldata.SetValueAt(batch.ColVec(0), converter(datum), rowIdx) + } + batchFull = helper.AccountForSet(rowIdx) + rowIdx++ + } + batch.SetLength(rowIdx) + require.Equal(t, expectedLength, rowIdx, fmt.Sprintf("expected batch to be full with %d rows, found %d", expectedLength, rowIdx)) + require.GreaterOrEqualf(t, maxBatchMemSize, memAcc.Used(), fmt.Sprintf("expected actual size of the batch to not exceed the limit")) + } +}