Skip to content

Commit

Permalink
colexec: deeply reset datum-backed vectors in ResetInternalBatch
Browse files Browse the repository at this point in the history
Previously, we would keep references to the old datums set in the
datum-backed vectors until the datums are overwritten, thus, extending
the time period when the no-longer-used datums are live unnecessarily.
We don't have to do that for any other types because for others we can
actually reuse the old space for new elements, so we want to keep them
around (e.g. decimals can reuse the non-inlined coefficient). This
commit makes it so that we deeply unset the datums in
`ResetInternalBatch`.

Care had to be taken to keep the memory accounting up-to-date. In
particular, after deeply resetting the datum-backed vector, we want to
release the memory allocation that was taken up by the actual datums
while keeping the overhead of `tree.Datum` interface in (since
`[]tree.Datum` slice is still fully live).

Release note: None

Release justification: low risk, high benefit changes to existing
functionality (reduced memory usage).
  • Loading branch information
yuzefovich committed Feb 28, 2022
1 parent c9adb0c commit cb93c30
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 89 deletions.
24 changes: 21 additions & 3 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -68,8 +69,9 @@ type Batch interface {
// important for callers to call ResetInternalBatch if they own internal
// batches that they reuse as not doing this could result in correctness
// or memory blowup issues. It unsets the selection and sets the length to
// 0.
ResetInternalBatch()
// 0. Notably, it deeply resets the datum-backed vectors and returns the
// number of bytes released as a result of the reset.
ResetInternalBatch() int64
// String returns a pretty representation of this batch.
String() string
}
Expand Down Expand Up @@ -126,6 +128,8 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor
b.b[i] = col
if col.IsBytesLike() {
b.bytesVecIdxs.Add(i)
} else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
b.datumVecIdxs.Add(i)
}
}
return b
Expand Down Expand Up @@ -202,6 +206,8 @@ type MemBatch struct {
// vectors and checking whether they are of Bytes type we store this slice
// separately.
bytesVecIdxs util.FastIntSet
// datumVecIdxs stores the indices of all datum-backed vectors in b.
datumVecIdxs util.FastIntSet
useSel bool
// sel is - if useSel is true - a selection vector from upstream. A
// selection vector is a list of selected tuple indices in this memBatch's
Expand Down Expand Up @@ -271,6 +277,8 @@ func (m *MemBatch) SetLength(length int) {
func (m *MemBatch) AppendCol(col Vec) {
if col.IsBytesLike() {
m.bytesVecIdxs.Add(len(m.b))
} else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
m.datumVecIdxs.Add(len(m.b))
}
m.b = append(m.b, col)
}
Expand Down Expand Up @@ -315,12 +323,17 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) {
m.bytesVecIdxs.Remove(i)
}
}
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
if i >= len(typs) {
m.datumVecIdxs.Remove(i)
}
}
m.ResetInternalBatch()
m.SetLength(length)
}

// ResetInternalBatch implements the Batch interface.
func (m *MemBatch) ResetInternalBatch() {
func (m *MemBatch) ResetInternalBatch() int64 {
m.SetLength(0 /* length */)
m.SetSelection(false)
for _, v := range m.b {
Expand All @@ -331,6 +344,11 @@ func (m *MemBatch) ResetInternalBatch() {
for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) {
Reset(m.b[i])
}
var released int64
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
released += m.b[i].Datum().Reset()
}
return released
}

// String returns a pretty representation of this batch.
Expand Down
5 changes: 3 additions & 2 deletions pkg/col/coldata/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type DatumVec interface {
AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int)
// AppendVal appends the given tree.Datum value to the end of the vector.
AppendVal(v Datum)
// SetLength sets the length of the vector.
SetLength(l int)
// Len returns the length of the vector.
Len() int
// Cap returns the underlying capacity of the vector.
Expand All @@ -55,4 +53,7 @@ type DatumVec interface {
// be used when elements before startIdx are guaranteed not to have been
// modified.
Size(startIdx int) int64
// Reset resets the vector for reuse. It returns the number of bytes
// released.
Reset() int64
}
51 changes: 29 additions & 22 deletions pkg/col/coldataext/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ func (dv *datumVec) AppendVal(v coldata.Datum) {
dv.data = append(dv.data, datum)
}

// SetLength implements coldata.DatumVec interface.
func (dv *datumVec) SetLength(l int) {
dv.data = dv.data[:l]
}

// Len implements coldata.DatumVec interface.
func (dv *datumVec) Len() int {
return len(dv.data)
Expand All @@ -145,6 +140,24 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error {
return err
}

// valuesSize returns the footprint of actual datums (in bytes) with ordinals in
// [startIdx:] range, ignoring the overhead of tree.Datum wrapper.
func (dv *datumVec) valuesSize(startIdx int) int64 {
var size int64
// Only the elements up to the length are expected to be non-nil. Note that
// we cannot take a short-cut with fixed-length values here because they
// might not be set, so we could over-account if we did something like
// size += (len-startIdx) * fixedSize.
if startIdx < dv.Len() {
for _, d := range dv.data[startIdx:dv.Len()] {
if d != nil {
size += int64(d.Size())
}
}
}
return size
}

// Size implements coldata.DatumVec interface.
func (dv *datumVec) Size(startIdx int) int64 {
// Note that we don't account for the overhead of datumVec struct, and the
Expand All @@ -156,24 +169,18 @@ func (dv *datumVec) Size(startIdx int) int64 {
if startIdx < 0 {
startIdx = 0
}
count := int64(dv.Cap() - startIdx)
size := memsize.DatumOverhead * count
if datumSize, variable := tree.DatumTypeSize(dv.t); variable {
// The elements in dv.data[max(startIdx,len):cap] range are accounted with
// the default datum size for the type. For those in the range
// [startIdx, len) we call Datum.Size().
idx := startIdx
for ; idx < len(dv.data); idx++ {
if dv.data[idx] != nil {
size += int64(dv.data[idx].Size())
}
}
// Pick up where the loop left off.
size += int64(dv.Cap()-idx) * int64(datumSize)
} else {
size += int64(datumSize) * count
// We have to account for the tree.Datum overhead for the whole capacity of
// the underlying slice.
return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx)
}

// Reset implements coldata.DatumVec interface.
func (dv *datumVec) Reset() int64 {
released := dv.valuesSize(0 /* startIdx */)
for i := range dv.data {
dv.data[i] = nil
}
return size
return released
}

// assertValidDatum asserts that the given datum is valid to be stored in this
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory)
}

// ResetInternalBatch implements the coldata.Batch interface.
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() {
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 {
b.SetLength(0 /* n */)
b.batch.ResetInternalBatch()
return b.batch.ResetInternalBatch()
}

// String implements the coldata.Batch interface.
Expand Down
120 changes: 60 additions & 60 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (a *Allocator) ResetMaybeReallocate(
}
if useOldBatch {
reallocated = false
oldBatch.ResetInternalBatch()
a.ReleaseMemory(oldBatch.ResetInternalBatch())
newBatch = oldBatch
} else {
a.ReleaseMemory(oldBatchMemSize)
Expand Down Expand Up @@ -266,6 +266,8 @@ func (a *Allocator) MaybeAppendColumn(b coldata.Batch, t *types.T, colIdx int) {
// Flat bytes vector needs to be reset before the vector can be
// reused.
coldata.Reset(presentVec)
} else if presentVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
a.ReleaseMemory(presentVec.Datum().Reset())
}
return
}
Expand Down Expand Up @@ -428,15 +430,11 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 {
case types.JsonFamily:
numBytesVectors++
case typeconv.DatumVecCanonicalTypeFamily:
// In datum vec we need to account for memory underlying the struct
// that is the implementation of tree.Datum interface (for example,
// tree.DBoolFalse) as well as for the overhead of storing that
// implementation in the slice of tree.Datums. Note that if t is of
// variable size, the memory will be properly accounted in
// getVecMemoryFootprint.
// Note: keep the calculation here in line with datumVec.Size.
implementationSize, _ := tree.DatumTypeSize(t)
acc += int64(implementationSize) + memsize.DatumOverhead
// Initially, only []tree.Datum slice is allocated for the
// datum-backed vectors right away, so that's what we're including
// in the estimate. Later on, once the actual values are set, they
// will be accounted for properly.
acc += memsize.DatumOverhead
case
types.BoolFamily,
types.IntFamily,
Expand Down Expand Up @@ -523,57 +521,51 @@ type SetAccountingHelper struct {
// that we have already accounted for.
prevBytesLikeTotalSize int64

// varSizeVecIdxs stores the indices of all vectors with variable sized
// values except for the bytes-like ones.
varSizeVecIdxs util.FastIntSet
// decimalVecs and datumVecs store all decimal and datum-backed vectors,
// respectively. They are updated every time a new batch is allocated.
// decimalVecIdxs stores the indices of all decimal vectors.
decimalVecIdxs util.FastIntSet
// decimalVecs stores all decimal vectors. They are updated every time a new
// batch is allocated.
decimalVecs []coldata.Decimals
datumVecs []coldata.DatumVec
// varSizeDatumSizes stores the amount of space we have accounted for for
// the corresponding "row" of variable length values in the last batch that
// the helper has touched. This is necessary to track because when the batch
// is reset, the vectors still have references to the old datums, so we need
// to adjust the accounting only by the delta. Similarly, once a new batch
// is allocated, we need to track the estimate that we have already
// decimalSizes stores the amount of space we have accounted for for the
// corresponding decimal values in the corresponding row of the last batch
// that the helper has touched. This is necessary to track because when the
// batch is reset, the vectors still have references to the old decimals, so
// we need to adjust the accounting only by the delta. Similarly, once a new
// batch is allocated, we need to track the estimate that we have already
// accounted for.
//
// Note that because ResetMaybeReallocate caps the capacity of the batch at
// coldata.BatchSize(), this slice will never exceed coldata.BatchSize() in
// size, and we choose to ignore it for the purposes of memory accounting.
varSizeDatumSizes []int64
// varSizeEstimatePerRow is the total estimated size of single values from
// varSizeVecIdxs vectors which is accounted for by EstimateBatchSizeBytes.
// It serves as the initial value for varSizeDatumSizes values.
varSizeEstimatePerRow int64
decimalSizes []int64

// varLenDatumVecIdxs stores the indices of all datum-backed vectors with
// variable-length values.
varLenDatumVecIdxs util.FastIntSet
// varLenDatumVecs stores all variable-sized datum-backed vectors. They are
// updated every time a new batch is allocated.
varLenDatumVecs []coldata.DatumVec
}

// Init initializes the helper.
func (h *SetAccountingHelper) Init(allocator *Allocator, typs []*types.T) {
h.Allocator = allocator

numDecimalVecs := 0
for vecIdx, typ := range typs {
switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) {
case types.BytesFamily, types.JsonFamily:
h.bytesLikeVecIdxs.Add(vecIdx)
case types.DecimalFamily:
h.varSizeVecIdxs.Add(vecIdx)
h.varSizeEstimatePerRow += memsize.Decimal
numDecimalVecs++
h.decimalVecIdxs.Add(vecIdx)
case typeconv.DatumVecCanonicalTypeFamily:
estimate, isVarlen := tree.DatumTypeSize(typ)
if isVarlen {
h.varSizeVecIdxs.Add(vecIdx)
h.varSizeEstimatePerRow += int64(estimate) + memsize.DatumOverhead
}
h.varLenDatumVecIdxs.Add(vecIdx)
}
}

h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty()
h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.decimalVecIdxs.Empty() && h.varLenDatumVecIdxs.Empty()
h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len())
h.decimalVecs = make([]coldata.Decimals, numDecimalVecs)
h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs)
h.decimalVecs = make([]coldata.Decimals, h.decimalVecIdxs.Len())
h.varLenDatumVecs = make([]coldata.DatumVec, h.varLenDatumVecIdxs.Len())
}

func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
Expand Down Expand Up @@ -616,23 +608,24 @@ func (h *SetAccountingHelper) ResetMaybeReallocate(
}
h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize()
}
if !h.varSizeVecIdxs.Empty() {
if !h.decimalVecIdxs.Empty() {
h.decimalVecs = h.decimalVecs[:0]
h.datumVecs = h.datumVecs[:0]
for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) {
if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily {
h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal())
} else {
h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum())
}
for vecIdx, ok := h.decimalVecIdxs.Next(0); ok; vecIdx, ok = h.decimalVecIdxs.Next(vecIdx + 1) {
h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal())
}
if cap(h.varSizeDatumSizes) < newBatch.Capacity() {
h.varSizeDatumSizes = make([]int64, newBatch.Capacity())
} else {
h.varSizeDatumSizes = h.varSizeDatumSizes[:newBatch.Capacity()]
h.decimalSizes = make([]int64, newBatch.Capacity())
for i := range h.decimalSizes {
// In EstimateBatchSizeBytes, memsize.Decimal has already been
// accounted for for each decimal value, so we multiple that by
// the number of decimal vectors to get already included
// footprint of all decimal values in a single row.
h.decimalSizes[i] = int64(len(h.decimalVecs)) * memsize.Decimal
}
for i := range h.varSizeDatumSizes {
h.varSizeDatumSizes[i] = h.varSizeEstimatePerRow
}
if !h.varLenDatumVecIdxs.Empty() {
h.varLenDatumVecs = h.varLenDatumVecs[:0]
for vecIdx, ok := h.varLenDatumVecIdxs.Next(0); ok; vecIdx, ok = h.varLenDatumVecIdxs.Next(vecIdx + 1) {
h.varLenDatumVecs = append(h.varLenDatumVecs, vecs[vecIdx].Datum())
}
}
}
Expand All @@ -655,18 +648,25 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) {
h.prevBytesLikeTotalSize = newBytesLikeTotalSize
}

if !h.varSizeVecIdxs.Empty() {
var newVarLengthDatumSize int64
if !h.decimalVecIdxs.Empty() {
var newDecimalSizes int64
for _, decimalVec := range h.decimalVecs {
d := decimalVec.Get(rowIdx)
newVarLengthDatumSize += int64(d.Size())
newDecimalSizes += int64(d.Size())
}
for _, datumVec := range h.datumVecs {
h.Allocator.AdjustMemoryUsage(newDecimalSizes - h.decimalSizes[rowIdx])
h.decimalSizes[rowIdx] = newDecimalSizes
}

if !h.varLenDatumVecIdxs.Empty() {
var newVarLengthDatumSize int64
for _, datumVec := range h.varLenDatumVecs {
datumSize := datumVec.Get(rowIdx).(tree.Datum).Size()
newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead
// Note that we're ignoring the overhead of tree.Datum because it
// was already included in EstimateBatchSizeBytes.
newVarLengthDatumSize += int64(datumSize)
}
h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx])
h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize
h.Allocator.AdjustMemoryUsage(newVarLengthDatumSize)
}
}

Expand Down
Loading

0 comments on commit cb93c30

Please sign in to comment.