From 962565621db88e10357755b260b2bbe7c04d5747 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 29 Jul 2021 00:02:07 -0700 Subject: [PATCH] colmem: optimize SetAccountingHelper This commit optimizes recently introduced SetAccountingHelper in order to reduce the number of casts per row. Namely, we can perform the cast from `coldata.Vec` to a concrete vector only once when a new batch is allocated and store the concrete vectors to be used during the per-row accounting. In a single benchmark this showed an improvement of about 3%. Release note: None --- pkg/sql/colfetcher/cfetcher.go | 3 +- pkg/sql/colmem/allocator.go | 95 +++++++++++++++++++++++--------- pkg/sql/colmem/allocator_test.go | 2 +- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index 383412ba4b01..fa4c3c436462 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -1115,7 +1115,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { if err := rf.fillNulls(); err != nil { return nil, err } - rf.accountingHelper.AccountForSet(rf.machine.batch, rf.machine.rowIdx) + rf.accountingHelper.AccountForSet(rf.machine.rowIdx) rf.machine.rowIdx++ rf.shiftState() @@ -1150,6 +1150,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { case stateEmitLastBatch: rf.machine.state[0] = stateFinished rf.finalizeBatch() + rf.accountingHelper.Close() return rf.machine.batch, nil case stateFinished: diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index d316b9b50113..99d32c61f04f 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -487,11 +487,20 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 { // only perform "set" operations on the coldata.Batch (i.e. neither copies nor // appends). It encapsulates the logic for performing the memory accounting for // these sets. +// NOTE: it works under the assumption that only a single coldata.Batch is being +// used. type SetAccountingHelper struct { Allocator *Allocator + // allFixedLength indicates that we're working with the type schema of only + // fixed-length elements. + allFixedLength bool + // bytesLikeVecIdxs stores the indices of all bytes-like vectors. bytesLikeVecIdxs util.FastIntSet + // bytesLikeVectors stores all actual bytes-like vectors. It is updated + // every time a new batch is allocated. + bytesLikeVectors []*coldata.Bytes // prevBytesLikeTotalSize tracks the total size of the bytes-like vectors // that we have already accounted for. prevBytesLikeTotalSize int64 @@ -499,6 +508,10 @@ type SetAccountingHelper struct { // varSizeVecIdxs stores the indices of all vectors with variable sized // values except for the bytes-like ones. varSizeVecIdxs util.FastIntSet + // decimalVecs and datumVecs store all decimal and datum-backed vectors, + // respectively. They are updated every time a new batch is allocated. + decimalVecs []coldata.Decimals + datumVecs []coldata.DatumVec // varSizeDatumSizes stores the amount of space we have accounted for for // the corresponding "row" of variable length values in the last batch that // the helper has touched. This is necessary to track because when the batch @@ -525,6 +538,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee h.Allocator = allocator curNotNeededPos := 0 + numDecimalVecs := 0 for vecIdx, typ := range typs { if len(notNeededVecIdxs) > curNotNeededPos && vecIdx == notNeededVecIdxs[curNotNeededPos] { curNotNeededPos++ @@ -536,6 +550,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee case types.DecimalFamily: h.varSizeVecIdxs.Add(vecIdx) h.varSizeEstimatePerRow += decimalEstimate + numDecimalVecs++ case typeconv.DatumVecCanonicalTypeFamily: estimate, isVarlen := tree.DatumTypeSize(typ) if isVarlen { @@ -544,18 +559,17 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee } } } + + h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() + h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len()) + h.decimalVecs = make([]coldata.Decimals, numDecimalVecs) + h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs) } -func (h *SetAccountingHelper) getBytesLikeTotalSize(batch coldata.Batch) int64 { +func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { var bytesLikeTotalSize int64 - for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) { - vec := batch.ColVec(vecIdx) - switch vec.CanonicalTypeFamily() { - case types.BytesFamily: - bytesLikeTotalSize += vec.Bytes().Size() - case types.JsonFamily: - bytesLikeTotalSize += vec.JSON().Size() - } + for _, b := range h.bytesLikeVectors { + bytesLikeTotalSize += b.Size() } return bytesLikeTotalSize } @@ -569,15 +583,39 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( newBatch, reallocated = h.Allocator.ResetMaybeReallocate( typs, oldBatch, minCapacity, maxBatchMemSize, ) - if reallocated { + if reallocated && !h.allFixedLength { // Allocator.ResetMaybeReallocate has released the precise memory // footprint of the old batch and has accounted for the estimated // footprint of the new batch. This means that we need to update our // internal memory tracking state to those estimates. + // + // Note that the loops below have type switches, but that is acceptable + // given that a batch is reallocated limited number of times throughout + // the lifetime of the helper's user (namely, at most + // log2(coldata.BatchSize())+1 (=11 by default) times since we double + // the capacity until coldata.BatchSize()). + vecs := newBatch.ColVecs() if !h.bytesLikeVecIdxs.Empty() { - h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize(newBatch) + h.bytesLikeVectors = h.bytesLikeVectors[:0] + for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) { + if vecs[vecIdx].CanonicalTypeFamily() == types.BytesFamily { + h.bytesLikeVectors = append(h.bytesLikeVectors, vecs[vecIdx].Bytes()) + } else { + h.bytesLikeVectors = append(h.bytesLikeVectors, &vecs[vecIdx].JSON().Bytes) + } + } + h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize() } if !h.varSizeVecIdxs.Empty() { + h.decimalVecs = h.decimalVecs[:0] + h.datumVecs = h.datumVecs[:0] + for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) { + if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily { + h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) + } else { + h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum()) + } + } if cap(h.varSizeDatumSizes) < newBatch.Capacity() { h.varSizeDatumSizes = make([]int64, newBatch.Capacity()) } else { @@ -592,35 +630,38 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( } // AccountForSet updates the Allocator according to the new variable length -// values in the row rowIdx in the batch. This method assumes that batch was -// allocated via ResetMaybeReallocate call on this helper. -func (h *SetAccountingHelper) AccountForSet(batch coldata.Batch, rowIdx int) { - if h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() { +// values in the row rowIdx in the batch that was returned by the last call to +// ResetMaybeReallocate. +func (h *SetAccountingHelper) AccountForSet(rowIdx int) { + if h.allFixedLength { // All vectors are of fixed-length and are already correctly accounted // for. return } - if !h.bytesLikeVecIdxs.Empty() { - newBytesLikeTotalSize := h.getBytesLikeTotalSize(batch) + if len(h.bytesLikeVectors) > 0 { + newBytesLikeTotalSize := h.getBytesLikeTotalSize() h.Allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize) h.prevBytesLikeTotalSize = newBytesLikeTotalSize } if !h.varSizeVecIdxs.Empty() { var newVarLengthDatumSize int64 - for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) { - vec := batch.ColVec(vecIdx) - switch vec.CanonicalTypeFamily() { - case types.DecimalFamily: - d := vec.Decimal().Get(rowIdx) - newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d)) - case typeconv.DatumVecCanonicalTypeFamily: - datumSize := vec.Datum().Get(rowIdx).(*coldataext.Datum).Size() - newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead - } + for _, decimalVec := range h.decimalVecs { + d := decimalVec.Get(rowIdx) + newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d)) + } + for _, datumVec := range h.datumVecs { + datumSize := datumVec.Get(rowIdx).(*coldataext.Datum).Size() + newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead } h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx]) h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize } } + +// Close releases all of the resources so that they could be garbage collected. +// It should be called once the caller is done with batch manipulation. +func (h *SetAccountingHelper) Close() { + *h = SetAccountingHelper{} +} diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index 206f435d1947..2c19e7d46f3a 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -314,7 +314,7 @@ func TestSetAccountingHelper(t *testing.T) { coldata.SetValueAt(batch.ColVec(vecIdx), converter(datum), rowIdx) } } - helper.AccountForSet(batch, rowIdx) + helper.AccountForSet(rowIdx) } // At this point, we have set all rows in the batch and performed the