From cb93c302de1bab49c8b3051ad96cf859bd91036f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 10 Feb 2022 21:33:35 -0800 Subject: [PATCH] colexec: deeply reset datum-backed vectors in ResetInternalBatch Previously, we would keep references to the old datums set in the datum-backed vectors until the datums are overwritten, thus, extending the time period when the no-longer-used datums are live unnecessarily. We don't have to do that for any other types because for others we can actually reuse the old space for new elements, so we want to keep them around (e.g. decimals can reuse the non-inlined coefficient). This commit makes it so that we deeply unset the datums in `ResetInternalBatch`. Care had to be taken to keep the memory accounting up-to-date. In particular, after deeply resetting the datum-backed vector, we want to release the memory allocation that was taken up by the actual datums while keeping the overhead of `tree.Datum` interface in (since `[]tree.Datum` slice is still fully live). Release note: None Release justification: low risk, high benefit changes to existing functionality (reduced memory usage). --- pkg/col/coldata/batch.go | 24 +++++- pkg/col/coldata/datum_vec.go | 5 +- pkg/col/coldataext/datum_vec.go | 51 ++++++----- pkg/sql/colexec/colexecutils/utils.go | 4 +- pkg/sql/colmem/allocator.go | 120 +++++++++++++------------- pkg/sql/colmem/allocator_test.go | 45 ++++++++++ 6 files changed, 160 insertions(+), 89 deletions(-) diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index 01df90b9329f..dd517fbe62c2 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -16,6 +16,7 @@ import ( "strings" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" @@ -68,8 +69,9 @@ type Batch interface { // important for callers to call ResetInternalBatch if they own internal // batches that they reuse as not doing this could result in correctness // or memory blowup issues. It unsets the selection and sets the length to - // 0. - ResetInternalBatch() + // 0. Notably, it deeply resets the datum-backed vectors and returns the + // number of bytes released as a result of the reset. + ResetInternalBatch() int64 // String returns a pretty representation of this batch. String() string } @@ -126,6 +128,8 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor b.b[i] = col if col.IsBytesLike() { b.bytesVecIdxs.Add(i) + } else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + b.datumVecIdxs.Add(i) } } return b @@ -202,6 +206,8 @@ type MemBatch struct { // vectors and checking whether they are of Bytes type we store this slice // separately. bytesVecIdxs util.FastIntSet + // datumVecIdxs stores the indices of all datum-backed vectors in b. + datumVecIdxs util.FastIntSet useSel bool // sel is - if useSel is true - a selection vector from upstream. A // selection vector is a list of selected tuple indices in this memBatch's @@ -271,6 +277,8 @@ func (m *MemBatch) SetLength(length int) { func (m *MemBatch) AppendCol(col Vec) { if col.IsBytesLike() { m.bytesVecIdxs.Add(len(m.b)) + } else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + m.datumVecIdxs.Add(len(m.b)) } m.b = append(m.b, col) } @@ -315,12 +323,17 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) { m.bytesVecIdxs.Remove(i) } } + for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { + if i >= len(typs) { + m.datumVecIdxs.Remove(i) + } + } m.ResetInternalBatch() m.SetLength(length) } // ResetInternalBatch implements the Batch interface. -func (m *MemBatch) ResetInternalBatch() { +func (m *MemBatch) ResetInternalBatch() int64 { m.SetLength(0 /* length */) m.SetSelection(false) for _, v := range m.b { @@ -331,6 +344,11 @@ func (m *MemBatch) ResetInternalBatch() { for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) { Reset(m.b[i]) } + var released int64 + for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { + released += m.b[i].Datum().Reset() + } + return released } // String returns a pretty representation of this batch. diff --git a/pkg/col/coldata/datum_vec.go b/pkg/col/coldata/datum_vec.go index 639b26656ed4..718c7d41806a 100644 --- a/pkg/col/coldata/datum_vec.go +++ b/pkg/col/coldata/datum_vec.go @@ -36,8 +36,6 @@ type DatumVec interface { AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int) // AppendVal appends the given tree.Datum value to the end of the vector. AppendVal(v Datum) - // SetLength sets the length of the vector. - SetLength(l int) // Len returns the length of the vector. Len() int // Cap returns the underlying capacity of the vector. @@ -55,4 +53,7 @@ type DatumVec interface { // be used when elements before startIdx are guaranteed not to have been // modified. Size(startIdx int) int64 + // Reset resets the vector for reuse. It returns the number of bytes + // released. + Reset() int64 } diff --git a/pkg/col/coldataext/datum_vec.go b/pkg/col/coldataext/datum_vec.go index aaca8a057236..6bfdf6271384 100644 --- a/pkg/col/coldataext/datum_vec.go +++ b/pkg/col/coldataext/datum_vec.go @@ -115,11 +115,6 @@ func (dv *datumVec) AppendVal(v coldata.Datum) { dv.data = append(dv.data, datum) } -// SetLength implements coldata.DatumVec interface. -func (dv *datumVec) SetLength(l int) { - dv.data = dv.data[:l] -} - // Len implements coldata.DatumVec interface. func (dv *datumVec) Len() int { return len(dv.data) @@ -145,6 +140,24 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error { return err } +// valuesSize returns the footprint of actual datums (in bytes) with ordinals in +// [startIdx:] range, ignoring the overhead of tree.Datum wrapper. +func (dv *datumVec) valuesSize(startIdx int) int64 { + var size int64 + // Only the elements up to the length are expected to be non-nil. Note that + // we cannot take a short-cut with fixed-length values here because they + // might not be set, so we could over-account if we did something like + // size += (len-startIdx) * fixedSize. + if startIdx < dv.Len() { + for _, d := range dv.data[startIdx:dv.Len()] { + if d != nil { + size += int64(d.Size()) + } + } + } + return size +} + // Size implements coldata.DatumVec interface. func (dv *datumVec) Size(startIdx int) int64 { // Note that we don't account for the overhead of datumVec struct, and the @@ -156,24 +169,18 @@ func (dv *datumVec) Size(startIdx int) int64 { if startIdx < 0 { startIdx = 0 } - count := int64(dv.Cap() - startIdx) - size := memsize.DatumOverhead * count - if datumSize, variable := tree.DatumTypeSize(dv.t); variable { - // The elements in dv.data[max(startIdx,len):cap] range are accounted with - // the default datum size for the type. For those in the range - // [startIdx, len) we call Datum.Size(). - idx := startIdx - for ; idx < len(dv.data); idx++ { - if dv.data[idx] != nil { - size += int64(dv.data[idx].Size()) - } - } - // Pick up where the loop left off. - size += int64(dv.Cap()-idx) * int64(datumSize) - } else { - size += int64(datumSize) * count + // We have to account for the tree.Datum overhead for the whole capacity of + // the underlying slice. + return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx) +} + +// Reset implements coldata.DatumVec interface. +func (dv *datumVec) Reset() int64 { + released := dv.valuesSize(0 /* startIdx */) + for i := range dv.data { + dv.data[i] = nil } - return size + return released } // assertValidDatum asserts that the given datum is valid to be stored in this diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index d1415d2f348c..8abc03e66bef 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -183,9 +183,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory) } // ResetInternalBatch implements the coldata.Batch interface. -func (b *AppendOnlyBufferedBatch) ResetInternalBatch() { +func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 { b.SetLength(0 /* n */) - b.batch.ResetInternalBatch() + return b.batch.ResetInternalBatch() } // String implements the coldata.Batch interface. diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 8af904f12ec3..567b9d6b0639 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -189,7 +189,7 @@ func (a *Allocator) ResetMaybeReallocate( } if useOldBatch { reallocated = false - oldBatch.ResetInternalBatch() + a.ReleaseMemory(oldBatch.ResetInternalBatch()) newBatch = oldBatch } else { a.ReleaseMemory(oldBatchMemSize) @@ -266,6 +266,8 @@ func (a *Allocator) MaybeAppendColumn(b coldata.Batch, t *types.T, colIdx int) { // Flat bytes vector needs to be reset before the vector can be // reused. coldata.Reset(presentVec) + } else if presentVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + a.ReleaseMemory(presentVec.Datum().Reset()) } return } @@ -428,15 +430,11 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 { case types.JsonFamily: numBytesVectors++ case typeconv.DatumVecCanonicalTypeFamily: - // In datum vec we need to account for memory underlying the struct - // that is the implementation of tree.Datum interface (for example, - // tree.DBoolFalse) as well as for the overhead of storing that - // implementation in the slice of tree.Datums. Note that if t is of - // variable size, the memory will be properly accounted in - // getVecMemoryFootprint. - // Note: keep the calculation here in line with datumVec.Size. - implementationSize, _ := tree.DatumTypeSize(t) - acc += int64(implementationSize) + memsize.DatumOverhead + // Initially, only []tree.Datum slice is allocated for the + // datum-backed vectors right away, so that's what we're including + // in the estimate. Later on, once the actual values are set, they + // will be accounted for properly. + acc += memsize.DatumOverhead case types.BoolFamily, types.IntFamily, @@ -523,57 +521,51 @@ type SetAccountingHelper struct { // that we have already accounted for. prevBytesLikeTotalSize int64 - // varSizeVecIdxs stores the indices of all vectors with variable sized - // values except for the bytes-like ones. - varSizeVecIdxs util.FastIntSet - // decimalVecs and datumVecs store all decimal and datum-backed vectors, - // respectively. They are updated every time a new batch is allocated. + // decimalVecIdxs stores the indices of all decimal vectors. + decimalVecIdxs util.FastIntSet + // decimalVecs stores all decimal vectors. They are updated every time a new + // batch is allocated. decimalVecs []coldata.Decimals - datumVecs []coldata.DatumVec - // varSizeDatumSizes stores the amount of space we have accounted for for - // the corresponding "row" of variable length values in the last batch that - // the helper has touched. This is necessary to track because when the batch - // is reset, the vectors still have references to the old datums, so we need - // to adjust the accounting only by the delta. Similarly, once a new batch - // is allocated, we need to track the estimate that we have already + // decimalSizes stores the amount of space we have accounted for for the + // corresponding decimal values in the corresponding row of the last batch + // that the helper has touched. This is necessary to track because when the + // batch is reset, the vectors still have references to the old decimals, so + // we need to adjust the accounting only by the delta. Similarly, once a new + // batch is allocated, we need to track the estimate that we have already // accounted for. // // Note that because ResetMaybeReallocate caps the capacity of the batch at // coldata.BatchSize(), this slice will never exceed coldata.BatchSize() in // size, and we choose to ignore it for the purposes of memory accounting. - varSizeDatumSizes []int64 - // varSizeEstimatePerRow is the total estimated size of single values from - // varSizeVecIdxs vectors which is accounted for by EstimateBatchSizeBytes. - // It serves as the initial value for varSizeDatumSizes values. - varSizeEstimatePerRow int64 + decimalSizes []int64 + + // varLenDatumVecIdxs stores the indices of all datum-backed vectors with + // variable-length values. + varLenDatumVecIdxs util.FastIntSet + // varLenDatumVecs stores all variable-sized datum-backed vectors. They are + // updated every time a new batch is allocated. + varLenDatumVecs []coldata.DatumVec } // Init initializes the helper. func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T) { h.Allocator = allocator - numDecimalVecs := 0 for vecIdx, typ := range typs { switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { case types.BytesFamily, types.JsonFamily: h.bytesLikeVecIdxs.Add(vecIdx) case types.DecimalFamily: - h.varSizeVecIdxs.Add(vecIdx) - h.varSizeEstimatePerRow += memsize.Decimal - numDecimalVecs++ + h.decimalVecIdxs.Add(vecIdx) case typeconv.DatumVecCanonicalTypeFamily: - estimate, isVarlen := tree.DatumTypeSize(typ) - if isVarlen { - h.varSizeVecIdxs.Add(vecIdx) - h.varSizeEstimatePerRow += int64(estimate) + memsize.DatumOverhead - } + h.varLenDatumVecIdxs.Add(vecIdx) } } - h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() + h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.decimalVecIdxs.Empty() && h.varLenDatumVecIdxs.Empty() h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len()) - h.decimalVecs = make([]coldata.Decimals, numDecimalVecs) - h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs) + h.decimalVecs = make([]coldata.Decimals, h.decimalVecIdxs.Len()) + h.varLenDatumVecs = make([]coldata.DatumVec, h.varLenDatumVecIdxs.Len()) } func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { @@ -616,23 +608,24 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( } h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize() } - if !h.varSizeVecIdxs.Empty() { + if !h.decimalVecIdxs.Empty() { h.decimalVecs = h.decimalVecs[:0] - h.datumVecs = h.datumVecs[:0] - for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) { - if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily { - h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) - } else { - h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum()) - } + for vecIdx, ok := h.decimalVecIdxs.Next(0); ok; vecIdx, ok = h.decimalVecIdxs.Next(vecIdx + 1) { + h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) } - if cap(h.varSizeDatumSizes) < newBatch.Capacity() { - h.varSizeDatumSizes = make([]int64, newBatch.Capacity()) - } else { - h.varSizeDatumSizes = h.varSizeDatumSizes[:newBatch.Capacity()] + h.decimalSizes = make([]int64, newBatch.Capacity()) + for i := range h.decimalSizes { + // In EstimateBatchSizeBytes, memsize.Decimal has already been + // accounted for for each decimal value, so we multiple that by + // the number of decimal vectors to get already included + // footprint of all decimal values in a single row. + h.decimalSizes[i] = int64(len(h.decimalVecs)) * memsize.Decimal } - for i := range h.varSizeDatumSizes { - h.varSizeDatumSizes[i] = h.varSizeEstimatePerRow + } + if !h.varLenDatumVecIdxs.Empty() { + h.varLenDatumVecs = h.varLenDatumVecs[:0] + for vecIdx, ok := h.varLenDatumVecIdxs.Next(0); ok; vecIdx, ok = h.varLenDatumVecIdxs.Next(vecIdx + 1) { + h.varLenDatumVecs = append(h.varLenDatumVecs, vecs[vecIdx].Datum()) } } } @@ -655,18 +648,25 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) { h.prevBytesLikeTotalSize = newBytesLikeTotalSize } - if !h.varSizeVecIdxs.Empty() { - var newVarLengthDatumSize int64 + if !h.decimalVecIdxs.Empty() { + var newDecimalSizes int64 for _, decimalVec := range h.decimalVecs { d := decimalVec.Get(rowIdx) - newVarLengthDatumSize += int64(d.Size()) + newDecimalSizes += int64(d.Size()) } - for _, datumVec := range h.datumVecs { + h.Allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx]) + h.decimalSizes[rowIdx] = newDecimalSizes + } + + if !h.varLenDatumVecIdxs.Empty() { + var newVarLengthDatumSize int64 + for _, datumVec := range h.varLenDatumVecs { datumSize := datumVec.Get(rowIdx).(tree.Datum).Size() - newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead + // Note that we're ignoring the overhead of tree.Datum because it + // was already included in EstimateBatchSizeBytes. + newVarLengthDatumSize += int64(datumSize) } - h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx]) - h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize + h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize) } } diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index 874e0ff3e44f..27ec3d57842b 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -35,6 +35,10 @@ import ( "github.com/stretchr/testify/require" ) +func init() { + randutil.SeedForTests() +} + func TestMaybeAppendColumn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -330,3 +334,44 @@ func TestSetAccountingHelper(t *testing.T) { } } } + +// TestEstimateBatchSizeBytes verifies that EstimateBatchSizeBytes returns such +// an estimate that it equals the actual footprint of the newly-created batch +// with no values set. +func TestEstimateBatchSizeBytes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + rng, _ := randutil.NewTestRand() + st := cluster.MakeTestingClusterSettings() + testMemMonitor := execinfra.NewTestMemMonitor(ctx, st) + defer testMemMonitor.Stop(ctx) + memAcc := testMemMonitor.MakeBoundAccount() + defer memAcc.Close(ctx) + evalCtx := tree.MakeTestingEvalContext(st) + testColumnFactory := coldataext.NewExtendedColumnFactory(&evalCtx) + testAllocator := colmem.NewAllocator(ctx, &memAcc, testColumnFactory) + + numCols := rng.Intn(10) + 1 + typs := make([]*types.T, numCols) + for i := range typs { + typs[i] = randgen.RandType(rng) + } + const numRuns = 10 + for run := 0; run < numRuns; run++ { + memAcc.Clear(ctx) + numRows := rng.Intn(coldata.BatchSize()) + 1 + batch := testAllocator.NewMemBatchWithFixedCapacity(typs, numRows) + expected := memAcc.Used() + actual := colmem.GetBatchMemSize(batch) + if expected != actual { + fmt.Printf("run = %d numRows = %d\n", run, numRows) + for i := range typs { + fmt.Printf("%s ", typs[i].SQLString()) + } + fmt.Println() + t.Fatal(errors.Newf("expected %d, actual %d", expected, actual)) + } + } +}