Skip to content

Commit

Permalink
Merge #68221
Browse files Browse the repository at this point in the history
68221: colmem: optimize SetAccountingHelper r=yuzefovich a=yuzefovich

This commit optimizes recently introduced SetAccountingHelper in order
to reduce the number of casts per row. Namely, we can perform the cast
from `coldata.Vec` to a concrete vector only once when a new batch is
allocated and store the concrete vectors to be used during the per-row
accounting. In a single benchmark this showed an improvement of about
3%.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 30, 2021
2 parents 53f18e3 + 9625656 commit fe1fb73
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
if err := rf.fillNulls(); err != nil {
return nil, err
}
rf.accountingHelper.AccountForSet(rf.machine.batch, rf.machine.rowIdx)
rf.accountingHelper.AccountForSet(rf.machine.rowIdx)
rf.machine.rowIdx++
rf.shiftState()

Expand Down Expand Up @@ -1150,6 +1150,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
case stateEmitLastBatch:
rf.machine.state[0] = stateFinished
rf.finalizeBatch()
rf.accountingHelper.Close()
return rf.machine.batch, nil

case stateFinished:
Expand Down
95 changes: 68 additions & 27 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,31 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 {
// only perform "set" operations on the coldata.Batch (i.e. neither copies nor
// appends). It encapsulates the logic for performing the memory accounting for
// these sets.
// NOTE: it works under the assumption that only a single coldata.Batch is being
// used.
type SetAccountingHelper struct {
Allocator *Allocator

// allFixedLength indicates that we're working with the type schema of only
// fixed-length elements.
allFixedLength bool

// bytesLikeVecIdxs stores the indices of all bytes-like vectors.
bytesLikeVecIdxs util.FastIntSet
// bytesLikeVectors stores all actual bytes-like vectors. It is updated
// every time a new batch is allocated.
bytesLikeVectors []*coldata.Bytes
// prevBytesLikeTotalSize tracks the total size of the bytes-like vectors
// that we have already accounted for.
prevBytesLikeTotalSize int64

// varSizeVecIdxs stores the indices of all vectors with variable sized
// values except for the bytes-like ones.
varSizeVecIdxs util.FastIntSet
// decimalVecs and datumVecs store all decimal and datum-backed vectors,
// respectively. They are updated every time a new batch is allocated.
decimalVecs []coldata.Decimals
datumVecs []coldata.DatumVec
// varSizeDatumSizes stores the amount of space we have accounted for for
// the corresponding "row" of variable length values in the last batch that
// the helper has touched. This is necessary to track because when the batch
Expand All @@ -525,6 +538,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
h.Allocator = allocator

curNotNeededPos := 0
numDecimalVecs := 0
for vecIdx, typ := range typs {
if len(notNeededVecIdxs) > curNotNeededPos && vecIdx == notNeededVecIdxs[curNotNeededPos] {
curNotNeededPos++
Expand All @@ -536,6 +550,7 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
case types.DecimalFamily:
h.varSizeVecIdxs.Add(vecIdx)
h.varSizeEstimatePerRow += decimalEstimate
numDecimalVecs++
case typeconv.DatumVecCanonicalTypeFamily:
estimate, isVarlen := tree.DatumTypeSize(typ)
if isVarlen {
Expand All @@ -544,18 +559,17 @@ func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T, notNee
}
}
}

h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty()
h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len())
h.decimalVecs = make([]coldata.Decimals, numDecimalVecs)
h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs)
}

func (h *SetAccountingHelper) getBytesLikeTotalSize(batch coldata.Batch) int64 {
func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
var bytesLikeTotalSize int64
for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) {
vec := batch.ColVec(vecIdx)
switch vec.CanonicalTypeFamily() {
case types.BytesFamily:
bytesLikeTotalSize += vec.Bytes().Size()
case types.JsonFamily:
bytesLikeTotalSize += vec.JSON().Size()
}
for _, b := range h.bytesLikeVectors {
bytesLikeTotalSize += b.Size()
}
return bytesLikeTotalSize
}
Expand All @@ -569,15 +583,39 @@ func (h *SetAccountingHelper) ResetMaybeReallocate(
newBatch, reallocated = h.Allocator.ResetMaybeReallocate(
typs, oldBatch, minCapacity, maxBatchMemSize,
)
if reallocated {
if reallocated && !h.allFixedLength {
// Allocator.ResetMaybeReallocate has released the precise memory
// footprint of the old batch and has accounted for the estimated
// footprint of the new batch. This means that we need to update our
// internal memory tracking state to those estimates.
//
// Note that the loops below have type switches, but that is acceptable
// given that a batch is reallocated limited number of times throughout
// the lifetime of the helper's user (namely, at most
// log2(coldata.BatchSize())+1 (=11 by default) times since we double
// the capacity until coldata.BatchSize()).
vecs := newBatch.ColVecs()
if !h.bytesLikeVecIdxs.Empty() {
h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize(newBatch)
h.bytesLikeVectors = h.bytesLikeVectors[:0]
for vecIdx, ok := h.bytesLikeVecIdxs.Next(0); ok; vecIdx, ok = h.bytesLikeVecIdxs.Next(vecIdx + 1) {
if vecs[vecIdx].CanonicalTypeFamily() == types.BytesFamily {
h.bytesLikeVectors = append(h.bytesLikeVectors, vecs[vecIdx].Bytes())
} else {
h.bytesLikeVectors = append(h.bytesLikeVectors, &vecs[vecIdx].JSON().Bytes)
}
}
h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize()
}
if !h.varSizeVecIdxs.Empty() {
h.decimalVecs = h.decimalVecs[:0]
h.datumVecs = h.datumVecs[:0]
for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) {
if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily {
h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal())
} else {
h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum())
}
}
if cap(h.varSizeDatumSizes) < newBatch.Capacity() {
h.varSizeDatumSizes = make([]int64, newBatch.Capacity())
} else {
Expand All @@ -592,35 +630,38 @@ func (h *SetAccountingHelper) ResetMaybeReallocate(
}

// AccountForSet updates the Allocator according to the new variable length
// values in the row rowIdx in the batch. This method assumes that batch was
// allocated via ResetMaybeReallocate call on this helper.
func (h *SetAccountingHelper) AccountForSet(batch coldata.Batch, rowIdx int) {
if h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() {
// values in the row rowIdx in the batch that was returned by the last call to
// ResetMaybeReallocate.
func (h *SetAccountingHelper) AccountForSet(rowIdx int) {
if h.allFixedLength {
// All vectors are of fixed-length and are already correctly accounted
// for.
return
}

if !h.bytesLikeVecIdxs.Empty() {
newBytesLikeTotalSize := h.getBytesLikeTotalSize(batch)
if len(h.bytesLikeVectors) > 0 {
newBytesLikeTotalSize := h.getBytesLikeTotalSize()
h.Allocator.AdjustMemoryUsage(newBytesLikeTotalSize - h.prevBytesLikeTotalSize)
h.prevBytesLikeTotalSize = newBytesLikeTotalSize
}

if !h.varSizeVecIdxs.Empty() {
var newVarLengthDatumSize int64
for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) {
vec := batch.ColVec(vecIdx)
switch vec.CanonicalTypeFamily() {
case types.DecimalFamily:
d := vec.Decimal().Get(rowIdx)
newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d))
case typeconv.DatumVecCanonicalTypeFamily:
datumSize := vec.Datum().Get(rowIdx).(*coldataext.Datum).Size()
newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead
}
for _, decimalVec := range h.decimalVecs {
d := decimalVec.Get(rowIdx)
newVarLengthDatumSize += int64(tree.SizeOfDecimal(&d))
}
for _, datumVec := range h.datumVecs {
datumSize := datumVec.Get(rowIdx).(*coldataext.Datum).Size()
newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead
}
h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx])
h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize
}
}

// Close releases all of the resources so that they could be garbage collected.
// It should be called once the caller is done with batch manipulation.
func (h *SetAccountingHelper) Close() {
*h = SetAccountingHelper{}
}
2 changes: 1 addition & 1 deletion pkg/sql/colmem/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestSetAccountingHelper(t *testing.T) {
coldata.SetValueAt(batch.ColVec(vecIdx), converter(datum), rowIdx)
}
}
helper.AccountForSet(batch, rowIdx)
helper.AccountForSet(rowIdx)
}

// At this point, we have set all rows in the batch and performed the
Expand Down

0 comments on commit fe1fb73

Please sign in to comment.