Skip to content

Commit

Permalink
colmem: optimize SetAccountingHelper
Browse files Browse the repository at this point in the history
This commit optimizes recently introduced SetAccountingHelper in order
to reduce the number of casts per row. Namely, we can perform the cast
from `coldata.Vec` to a concrete vector only once when a new batch is
allocated and store the concrete vectors to be used during the per-row
accounting. In a single benchmark this showed an improvement of about
3%.

Release note: None
  • Loading branch information
yuzefovich committed Jul 30, 2021
1 parent f19914b commit 9625656
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
if err := rf.fillNulls(); err != nil {
return nil, err
}
rf.accountingHelper.AccountForSet(rf.machine.batch, rf.machine.rowIdx)
rf.accountingHelper.AccountForSet(rf.machine.rowIdx)
rf.machine.rowIdx++
rf.shiftState()

Expand Down Expand Up @@ -1150,6 +1150,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
case stateEmitLastBatch:
rf.machine.state[0] = stateFinished
rf.finalizeBatch()
rf.accountingHelper.Close()
return rf.machine.batch, nil

case stateFinished:
Expand Down
95 changes: 68 additions & 27 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,31 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 {
// only perform "set" operations on the coldata.Batch (i.e. neither copies nor
// appends). It encapsulates the logic for performing the memory accounting for
// these sets.
// NOTE: it works under the assumption that only a single coldata.Batch is being
// used.
type SetAccountingHelper struct {
Allocator *Allocator

// allFixedLength indicates that we're working with the type schema of only
// fixed-length elements.
allFixedLength bool

// bytesLikeVecIdxs stores the indices of all bytes-like vectors.
bytesLikeVecIdxs util.FastIntSet
// bytesLikeVectors stores all actual bytes-like vectors. It is updated
// every time a new batch is allocated.
bytesLikeVectors []*coldata.Bytes
// prevBytesLikeTotalSize tracks the total size of the bytes-like vectors
// that we have already accounted for.
prevBytesLikeTotalSize int64

// varSizeVecIdxs stores the indices of all vectors with variable sized
// values except for the bytes-like ones.
varSizeVecIdxs util.FastIntSet
// decimalVecs and datumVecs store all decimal and datum-backed vectors,
// respectively. They are updated every time a new batch is allocated.
decimalVecs []coldata.Decimals
datumVecs []coldata.DatumVec
// varSizeDatumSizes stores the amount of space we have accounted for for
// the corresponding "row" of variable length values in the last batch that
// the helper has touched. This is necessary to track because when the batch
Expand All @@ -525,6 +538,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
h.Allocator = allocator

curNotNeededPos := 0
numDecimalVecs := 0
for vecIdx, typ := range typs {
if len(notNeededVecIdxs) > curNotNeededPos && vecIdx == notNeededVecIdxs[curNotNeededPos] {
curNotNeededPos++
Expand All @@ -536,6 +550,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
case types.DecimalFamily:
h.varSizeVecIdxs.Add(vecIdx)
h.varSizeEstimatePerRow += decimalEstimate
numDecimalVecs++
case typeconv.DatumVecCanonicalTypeFamily:
estimate, isVarlen := tree.DatumTypeSize(typ)
if isVarlen {
Expand All @@ -544,18 +559,17 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
}
}
}

h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty()
h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len())
h.decimalVecs = make([]coldata.Decimals, numDecimalVecs)
h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs)
}

func (h *SetAccountingHelper) getBytesLikeTotalSize(batch coldata.Batch) int64 {
func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
var bytesLikeTotalSize int64
for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) {
vec := batch.ColVec(vecIdx)
switch vec.CanonicalTypeFamily() {
case types.BytesFamily:
bytesLikeTotalSize += vec.Bytes().Size()
case types.JsonFamily:
bytesLikeTotalSize += vec.JSON().Size()
}
for _, b := range h.bytesLikeVectors {
bytesLikeTotalSize += b.Size()
}
return bytesLikeTotalSize
}
Expand All @@ -569,15 +583,39 @@ func (h *SetAccountingHelper) ResetMaybeReallocate(
newBatch, reallocated = h.Allocator.ResetMaybeReallocate(
typs, oldBatch, minCapacity, maxBatchMemSize,
)
if reallocated {
if reallocated && !h.allFixedLength {
// Allocator.ResetMaybeReallocate has released the precise memory
// footprint of the old batch and has accounted for the estimated
// footprint of the new batch. This means that we need to update our
// internal memory tracking state to those estimates.
//
// Note that the loops below have type switches, but that is acceptable
// given that a batch is reallocated limited number of times throughout
// the lifetime of the helper's user (namely, at most
// log2(coldata.BatchSize())+1 (=11 by default) times since we double
// the capacity until coldata.BatchSize()).
vecs := newBatch.ColVecs()
if !h.bytesLikeVecIdxs.Empty() {
h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize(newBatch)
h.bytesLikeVectors = h.bytesLikeVectors[:0]
for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) {
if vecs[vecIdx].CanonicalTypeFamily() == types.BytesFamily {
h.bytesLikeVectors = append(h.bytesLikeVectors, vecs[vecIdx].Bytes())
} else {
h.bytesLikeVectors = append(h.bytesLikeVectors, &vecs[vecIdx].JSON().Bytes)
}
}
h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize()
}
if !h.varSizeVecIdxs.Empty() {
h.decimalVecs = h.decimalVecs[:0]
h.datumVecs = h.datumVecs[:0]
for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) {
if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily {
h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal())
} else {
h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum())
}
}
if cap(h.varSizeDatumSizes) < newBatch.Capacity() {
h.varSizeDatumSizes = make([]int64, newBatch.Capacity())
} else {
Expand All @@ -592,35 +630,38 @@ func (h *SetAccountingHelper) ResetMaybeReallocate(
}

// AccountForSet updates the Allocator according to the new variable length
// values in the row rowIdx in the batch. This method assumes that batch was
// allocated via ResetMaybeReallocate call on this helper.
func (h *SetAccountingHelper) AccountForSet(batch coldata.Batch, rowIdx int) {
if h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() {
// values in the row rowIdx in the batch that was returned by the last call to
// ResetMaybeReallocate.
func (h *SetAccountingHelper) AccountForSet(rowIdx int) {
if h.allFixedLength {
// All vectors are of fixed-length and are already correctly accounted
// for.
return
}

if !h.bytesLikeVecIdxs.Empty() {
newBytesLikeTotalSize := h.getBytesLikeTotalSize(batch)
if len(h.bytesLikeVectors) > 0 {
newBytesLikeTotalSize := h.getBytesLikeTotalSize()
h.Allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize)
h.prevBytesLikeTotalSize = newBytesLikeTotalSize
}

if !h.varSizeVecIdxs.Empty() {
var newVarLengthDatumSize int64
for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) {
vec := batch.ColVec(vecIdx)
switch vec.CanonicalTypeFamily() {
case types.DecimalFamily:
d := vec.Decimal().Get(rowIdx)
newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d))
case typeconv.DatumVecCanonicalTypeFamily:
datumSize := vec.Datum().Get(rowIdx).(*coldataext.Datum).Size()
newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead
}
for _, decimalVec := range h.decimalVecs {
d := decimalVec.Get(rowIdx)
newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d))
}
for _, datumVec := range h.datumVecs {
datumSize := datumVec.Get(rowIdx).(*coldataext.Datum).Size()
newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead
}
h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx])
h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize
}
}

// Close releases all of the resources so that they could be garbage collected.
// It should be called once the caller is done with batch manipulation.
func (h *SetAccountingHelper) Close() {
*h = SetAccountingHelper{}
}
2 changes: 1 addition & 1 deletion pkg/sql/colmem/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestSetAccountingHelper(t *testing.T) {
coldata.SetValueAt(batch.ColVec(vecIdx), converter(datum), rowIdx)
}
}
helper.AccountForSet(batch, rowIdx)
helper.AccountForSet(rowIdx)
}

// At this point, we have set all rows in the batch and performed the
Expand Down

0 comments on commit 9625656

Please sign in to comment.