diff --git a/pkg/col/coldata/BUILD.bazel b/pkg/col/coldata/BUILD.bazel index 276ac3bfb791..bbb7903d91ea 100644 --- a/pkg/col/coldata/BUILD.bazel +++ b/pkg/col/coldata/BUILD.bazel @@ -24,7 +24,6 @@ go_library( "//pkg/util", "//pkg/util/buildutil", "//pkg/util/duration", - "//pkg/util/intsets", "//pkg/util/json", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index 172d417deba7..d78c823c139e 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -16,10 +16,8 @@ import ( "strings" "sync/atomic" - "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/errors" ) @@ -73,12 +71,7 @@ type Batch interface { // batches that they reuse as not doing this could result in correctness // or memory blowup issues. It unsets the selection and sets the length to // 0. - // - // Notably, it deeply resets the datum-backed vectors and returns the number - // of bytes released as a result of the reset. Callers should update the - // allocator (which the batch was instantiated from) accordingly unless they - // guarantee that the batch doesn't have any datum-backed vectors. - ResetInternalBatch() int64 + ResetInternalBatch() // String returns a pretty representation of this batch. String() string } @@ -136,9 +129,6 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor col := &cols[i] col.init(t, capacity, factory) b.b[i] = col - if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { - b.datumVecIdxs.Add(i) - } } return b } @@ -208,10 +198,8 @@ type MemBatch struct { // MemBatch. capacity int // b is the slice of columns in this batch. - b []Vec - // datumVecIdxs stores the indices of all datum-backed vectors in b. - datumVecIdxs intsets.Fast - useSel bool + b []Vec + useSel bool // sel is - if useSel is true - a selection vector from upstream. A // selection vector is a list of selected tuple indices in this memBatch's // columns (tuples for which indices are not in sel are considered to be @@ -264,9 +252,6 @@ func (m *MemBatch) SetLength(length int) { // AppendCol implements the Batch interface. func (m *MemBatch) AppendCol(col Vec) { - if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { - m.datumVecIdxs.Add(len(m.b)) - } m.b = append(m.b, col) } @@ -305,17 +290,12 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) { // since those will get reset in ResetInternalBatch anyway. m.b = m.b[:len(typs)] m.sel = m.sel[:length] - for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { - if i >= len(typs) { - m.datumVecIdxs.Remove(i) - } - } m.ResetInternalBatch() m.SetLength(length) } // ResetInternalBatch implements the Batch interface. -func (m *MemBatch) ResetInternalBatch() int64 { +func (m *MemBatch) ResetInternalBatch() { m.SetLength(0 /* length */) m.SetSelection(false) for _, v := range m.b { @@ -324,11 +304,6 @@ func (m *MemBatch) ResetInternalBatch() int64 { ResetIfBytesLike(v) } } - var released int64 - for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) { - released += m.b[i].Datum().Reset() - } - return released } // String returns a pretty representation of this batch. diff --git a/pkg/col/coldata/datum_vec.go b/pkg/col/coldata/datum_vec.go index 718c7d41806a..639b26656ed4 100644 --- a/pkg/col/coldata/datum_vec.go +++ b/pkg/col/coldata/datum_vec.go @@ -36,6 +36,8 @@ type DatumVec interface { AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int) // AppendVal appends the given tree.Datum value to the end of the vector. AppendVal(v Datum) + // SetLength sets the length of the vector. + SetLength(l int) // Len returns the length of the vector. Len() int // Cap returns the underlying capacity of the vector. @@ -53,7 +55,4 @@ type DatumVec interface { // be used when elements before startIdx are guaranteed not to have been // modified. Size(startIdx int) int64 - // Reset resets the vector for reuse. It returns the number of bytes - // released. - Reset() int64 } diff --git a/pkg/col/coldataext/datum_vec.go b/pkg/col/coldataext/datum_vec.go index 7ad5ccb3e1c9..ef390bfa5c75 100644 --- a/pkg/col/coldataext/datum_vec.go +++ b/pkg/col/coldataext/datum_vec.go @@ -133,6 +133,11 @@ func (dv *datumVec) AppendVal(v coldata.Datum) { dv.data = append(dv.data, datum) } +// SetLength implements coldata.DatumVec interface. +func (dv *datumVec) SetLength(l int) { + dv.data = dv.data[:l] +} + // Len implements coldata.DatumVec interface. func (dv *datumVec) Len() int { return len(dv.data) @@ -158,24 +163,6 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error { return err } -// valuesSize returns the footprint of actual datums (in bytes) with ordinals in -// [startIdx:] range, ignoring the overhead of tree.Datum wrapper. -func (dv *datumVec) valuesSize(startIdx int) int64 { - var size int64 - // Only the elements up to the length are expected to be non-nil. Note that - // we cannot take a short-cut with fixed-length values here because they - // might not be set, so we could over-account if we did something like - // size += (len-startIdx) * fixedSize. - if startIdx < dv.Len() { - for _, d := range dv.data[startIdx:dv.Len()] { - if d != nil { - size += int64(d.Size()) - } - } - } - return size -} - // Size implements coldata.DatumVec interface. func (dv *datumVec) Size(startIdx int) int64 { // Note that we don't account for the overhead of datumVec struct, and the @@ -187,18 +174,24 @@ func (dv *datumVec) Size(startIdx int) int64 { if startIdx < 0 { startIdx = 0 } - // We have to account for the tree.Datum overhead for the whole capacity of - // the underlying slice. - return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx) -} - -// Reset implements coldata.DatumVec interface. -func (dv *datumVec) Reset() int64 { - released := dv.valuesSize(0 /* startIdx */) - for i := range dv.data { - dv.data[i] = nil + count := int64(dv.Cap() - startIdx) + size := memsize.DatumOverhead * count + if datumSize, variable := tree.DatumTypeSize(dv.t); variable { + // The elements in dv.data[max(startIdx,len):cap] range are accounted with + // the default datum size for the type. For those in the range + // [startIdx, len) we call Datum.Size(). + idx := startIdx + for ; idx < len(dv.data); idx++ { + if dv.data[idx] != nil { + size += int64(dv.data[idx].Size()) + } + } + // Pick up where the loop left off. + size += int64(dv.Cap()-idx) * int64(datumSize) + } else { + size += int64(datumSize) * count } - return released + return size } // assertValidDatum asserts that the given datum is valid to be stored in this diff --git a/pkg/sql/colexec/colexecdisk/hash_based_partitioner.go b/pkg/sql/colexec/colexecdisk/hash_based_partitioner.go index 521b4b1dde7b..8b6d641c6e52 100644 --- a/pkg/sql/colexec/colexecdisk/hash_based_partitioner.go +++ b/pkg/sql/colexec/colexecdisk/hash_based_partitioner.go @@ -379,7 +379,7 @@ func (op *hashBasedPartitioner) partitionBatch( for idx, sel := range selections { partitionIdx := op.partitionIdxOffset + idx if len(sel) > 0 { - op.unlimitedAllocator.ResetBatch(scratchBatch) + scratchBatch.ResetInternalBatch() // The partitioner expects the batches without a selection vector, // so we need to copy the tuples according to the selection vector // into a scratch batch. diff --git a/pkg/sql/colexec/colexecutils/operator.go b/pkg/sql/colexec/colexecutils/operator.go index f655b5df1326..92c20d416124 100644 --- a/pkg/sql/colexec/colexecutils/operator.go +++ b/pkg/sql/colexec/colexecutils/operator.go @@ -76,9 +76,7 @@ func (s *fixedNumTuplesNoInputOp) Next() coldata.Batch { if s.numTuplesLeft == 0 { return coldata.ZeroBatch } - // The internal batch has no columns, so no memory is ever released on the - // ResetInternalBatch() call. - _ = s.batch.ResetInternalBatch() + s.batch.ResetInternalBatch() length := s.numTuplesLeft if length > coldata.BatchSize() { length = coldata.BatchSize() diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index 39aaba6d6da8..2f19b2060e51 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -185,12 +185,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory) } // ResetInternalBatch implements the coldata.Batch interface. -// NB: any memory released during this call is automatically released from the -// allocator that created the batch. -func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 { +func (b *AppendOnlyBufferedBatch) ResetInternalBatch() { b.SetLength(0 /* n */) - b.allocator.ReleaseMemory(b.batch.ResetInternalBatch()) - return 0 + b.batch.ResetInternalBatch() } // String implements the coldata.Batch interface. diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index 0707442d2e1d..84776f9d62d9 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -265,7 +265,7 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { continue } - r.allocator.ResetBatch(r.output) + r.output.ResetInternalBatch() // First, we copy over the buffered up columns. r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() { for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] { @@ -478,9 +478,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.partitionsState.runningSizes.ResetInternalBatch() + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -503,9 +501,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.partitionsState.runningSizes.ResetInternalBatch() + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -533,7 +529,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { r.numTuplesInPartition = 0 } - r.allocator.ResetBatch(r.output) + r.output.ResetInternalBatch() // First, we copy over the buffered up columns. r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() { for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] { @@ -762,9 +758,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.peerGroupsState.runningSizes.ResetInternalBatch() + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -787,9 +781,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.peerGroupsState.runningSizes.ResetInternalBatch() + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -816,7 +808,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { r.numPeers = 0 } - r.allocator.ResetBatch(r.output) + r.output.ResetInternalBatch() // First, we copy over the buffered up columns. r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() { for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] { @@ -1053,9 +1045,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.partitionsState.runningSizes.ResetInternalBatch() + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1078,9 +1068,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.partitionsState.runningSizes.ResetInternalBatch() + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1109,9 +1097,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.peerGroupsState.runningSizes.ResetInternalBatch() + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1134,9 +1120,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.peerGroupsState.runningSizes.ResetInternalBatch() + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1171,7 +1155,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { r.numPeers = 0 } - r.allocator.ResetBatch(r.output) + r.output.ResetInternalBatch() // First, we copy over the buffered up columns. r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() { for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] { diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index 84f2efc640da..1aedae2fbf5a 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -156,9 +156,7 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.partitionsState.runningSizes.ResetInternalBatch() + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -190,9 +188,7 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 - // This batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = r.peerGroupsState.runningSizes.ResetInternalBatch() + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -483,7 +479,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { } // {{end}} - r.allocator.ResetBatch(r.output) + r.output.ResetInternalBatch() // First, we copy over the buffered up columns. r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() { for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] { diff --git a/pkg/sql/colexec/count.go b/pkg/sql/colexec/count.go index 410684400ffd..0526f585d710 100644 --- a/pkg/sql/colexec/count.go +++ b/pkg/sql/colexec/count.go @@ -46,9 +46,7 @@ func (c *countOp) Next() coldata.Batch { if c.done { return coldata.ZeroBatch } - // The internal batch has only a single INT column, so no memory is ever - // released on the ResetInternalBatch() call. - _ = c.internalBatch.ResetInternalBatch() + c.internalBatch.ResetInternalBatch() for { bat := c.Input.Next() length := bat.Length() diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index 45bfb38036dc..9987852bb421 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -192,7 +192,7 @@ func (a *orderedAggregator) Next() coldata.Batch { switch a.state { case orderedAggregatorAggregating: if a.scratch.shouldResetInternalBatch { - a.allocator.ResetBatch(a.scratch) + a.scratch.ResetInternalBatch() a.scratch.shouldResetInternalBatch = false } if a.scratch.resumeIdx >= coldata.BatchSize() { @@ -327,7 +327,7 @@ func (a *orderedAggregator) Next() coldata.Batch { if a.unsafeBatch == nil { a.unsafeBatch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, coldata.BatchSize()) } else { - a.allocator.ResetBatch(a.unsafeBatch) + a.unsafeBatch.ResetInternalBatch() } a.allocator.PerformOperation(a.unsafeBatch.ColVecs(), func() { for i := 0; i < len(a.outputTypes); i++ { @@ -353,7 +353,7 @@ func (a *orderedAggregator) Next() coldata.Batch { // the source and the destination would be the same, and // resetting it would lead to the loss of data. newResumeIdx := a.scratch.resumeIdx - coldata.BatchSize() - a.allocator.ResetBatch(a.scratch.tempBuffer) + a.scratch.tempBuffer.ResetInternalBatch() a.allocator.PerformOperation(a.scratch.tempBuffer.ColVecs(), func() { for i := 0; i < len(a.outputTypes); i++ { a.scratch.tempBuffer.ColVec(i).Copy( @@ -365,7 +365,7 @@ func (a *orderedAggregator) Next() coldata.Batch { ) } }) - a.allocator.ResetBatch(a.scratch) + a.scratch.ResetInternalBatch() a.allocator.PerformOperation(a.scratch.ColVecs(), func() { for i := 0; i < len(a.outputTypes); i++ { a.scratch.ColVec(i).Copy( diff --git a/pkg/sql/colmem/BUILD.bazel b/pkg/sql/colmem/BUILD.bazel index 7e5734d9f411..29efe82fbc1c 100644 --- a/pkg/sql/colmem/BUILD.bazel +++ b/pkg/sql/colmem/BUILD.bazel @@ -33,6 +33,7 @@ go_test( deps = [ "//pkg/col/coldata", "//pkg/col/coldataext", + "//pkg/col/typeconv", "//pkg/settings/cluster", "//pkg/sql/colconv", "//pkg/sql/colexec/colexecutils", diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 9f10019eb680..983475b7a963 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -183,11 +183,6 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat return coldata.NewMemBatchNoCols(typs, capacity) } -// ResetBatch resets the batch while keeping the memory accounting updated. -func (a *Allocator) ResetBatch(batch coldata.Batch) { - a.ReleaseMemory(batch.ResetInternalBatch()) -} - // truncateToMemoryLimit returns the largest batch capacity that is still within // the memory limit for the given type schema. The returned value is at most // minDesiredCapacity and at least 1. @@ -311,7 +306,7 @@ func (a *Allocator) resetMaybeReallocate( } if useOldBatch { reallocated = false - a.ResetBatch(oldBatch) + oldBatch.ResetInternalBatch() newBatch = oldBatch } else { a.ReleaseMemory(oldBatchMemSize) @@ -399,11 +394,7 @@ func (a *Allocator) MaybeAppendColumn(b coldata.Batch, t *types.T, colIdx int) { b.ReplaceCol(a.NewMemColumn(t, desiredCapacity), colIdx) return } - if presentVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { - a.ReleaseMemory(presentVec.Datum().Reset()) - } else { - coldata.ResetIfBytesLike(presentVec) - } + coldata.ResetIfBytesLike(presentVec) if presentVec.MaybeHasNulls() { presentVec.Nulls().UnsetNulls() } @@ -602,11 +593,15 @@ func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int64 { // use the flat struct size as an estimate. acc += memsize.Decimal case typeconv.DatumVecCanonicalTypeFamily: - // Initially, only []tree.Datum slice is allocated for the - // datum-backed vectors right away, so that's what we're including - // in the estimate. Later on, once the actual values are set, they - // will be accounted for properly. - acc += memsize.DatumOverhead + // In datum vec we need to account for memory underlying the struct + // that is the implementation of tree.Datum interface (for example, + // tree.DBoolFalse) as well as for the overhead of storing that + // implementation in the slice of tree.Datums. Note that if t is of + // variable size, the memory will be properly accounted in + // getVecMemoryFootprint. + // Note: keep the calculation here in line with datumVec.Size. + implementationSize, _ := tree.DatumTypeSize(t) + acc += int64(implementationSize) + memsize.DatumOverhead case types.BoolFamily, types.IntFamily, @@ -829,30 +824,29 @@ type SetAccountingHelper struct { // that we have already accounted for. prevBytesLikeTotalSize int64 - // decimalVecIdxs stores the indices of all decimal vectors. - decimalVecIdxs intsets.Fast - // decimalVecs stores all decimal vectors. They are updated every time a new - // batch is allocated. + // varSizeVecIdxs stores the indices of all vectors with variable sized + // values except for the bytes-like ones. + varSizeVecIdxs intsets.Fast + // decimalVecs and datumVecs store all decimal and datum-backed vectors, + // respectively. They are updated every time a new batch is allocated. decimalVecs []coldata.Decimals - // decimalSizes stores the amount of space we have accounted for for the - // corresponding decimal values in the corresponding row of the last batch - // that the helper has touched. This is necessary to track because when the - // batch is reset, the vectors still have references to the old decimals, so - // we need to adjust the accounting only by the delta. Similarly, once a new - // batch is allocated, we need to track the estimate that we have already + datumVecs []coldata.DatumVec + // varSizeDatumSizes stores the amount of space we have accounted for for + // the corresponding "row" of variable length values in the last batch that + // the helper has touched. This is necessary to track because when the batch + // is reset, the vectors still have references to the old datums, so we need + // to adjust the accounting only by the delta. Similarly, once a new batch + // is allocated, we need to track the estimate that we have already // accounted for. // // Note that because ResetMaybeReallocate caps the capacity of the batch at // coldata.BatchSize(), this slice will never exceed coldata.BatchSize() in // size, and we choose to ignore it for the purposes of memory accounting. - decimalSizes []int64 - - // varLenDatumVecIdxs stores the indices of all datum-backed vectors with - // variable-length values. - varLenDatumVecIdxs intsets.Fast - // varLenDatumVecs stores all variable-sized datum-backed vectors. They are - // updated every time a new batch is allocated. - varLenDatumVecs []coldata.DatumVec + varSizeDatumSizes []int64 + // varSizeEstimatePerRow is the total estimated size of single values from + // varSizeVecIdxs vectors which is accounted for by EstimateBatchSizeBytes. + // It serves as the initial value for varSizeDatumSizes values. + varSizeEstimatePerRow int64 } // Init initializes the helper. The allocator must **not** be shared with any @@ -860,21 +854,28 @@ type SetAccountingHelper struct { func (h *SetAccountingHelper) Init(allocator *Allocator, memoryLimit int64, typs []*types.T) { h.helper.Init(allocator, memoryLimit) + numDecimalVecs := 0 for vecIdx, typ := range typs { switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { case types.BytesFamily, types.JsonFamily: h.bytesLikeVecIdxs.Add(vecIdx) case types.DecimalFamily: - h.decimalVecIdxs.Add(vecIdx) + h.varSizeVecIdxs.Add(vecIdx) + h.varSizeEstimatePerRow += memsize.Decimal + numDecimalVecs++ case typeconv.DatumVecCanonicalTypeFamily: - h.varLenDatumVecIdxs.Add(vecIdx) + estimate, isVarlen := tree.DatumTypeSize(typ) + if isVarlen { + h.varSizeVecIdxs.Add(vecIdx) + h.varSizeEstimatePerRow += int64(estimate) + memsize.DatumOverhead + } } } - h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.decimalVecIdxs.Empty() && h.varLenDatumVecIdxs.Empty() + h.allFixedLength = h.bytesLikeVecIdxs.Empty() && h.varSizeVecIdxs.Empty() h.bytesLikeVectors = make([]*coldata.Bytes, h.bytesLikeVecIdxs.Len()) - h.decimalVecs = make([]coldata.Decimals, h.decimalVecIdxs.Len()) - h.varLenDatumVecs = make([]coldata.DatumVec, h.varLenDatumVecIdxs.Len()) + h.decimalVecs = make([]coldata.Decimals, numDecimalVecs) + h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs) } func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { @@ -921,24 +922,23 @@ func (h *SetAccountingHelper) ResetMaybeReallocate( } h.prevBytesLikeTotalSize = h.getBytesLikeTotalSize() } - if !h.decimalVecIdxs.Empty() { + if !h.varSizeVecIdxs.Empty() { h.decimalVecs = h.decimalVecs[:0] - for vecIdx, ok := h.decimalVecIdxs.Next(0); ok; vecIdx, ok = h.decimalVecIdxs.Next(vecIdx + 1) { - h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) + h.datumVecs = h.datumVecs[:0] + for vecIdx, ok := h.varSizeVecIdxs.Next(0); ok; vecIdx, ok = h.varSizeVecIdxs.Next(vecIdx + 1) { + if vecs[vecIdx].CanonicalTypeFamily() == types.DecimalFamily { + h.decimalVecs = append(h.decimalVecs, vecs[vecIdx].Decimal()) + } else { + h.datumVecs = append(h.datumVecs, vecs[vecIdx].Datum()) + } } - h.decimalSizes = make([]int64, newBatch.Capacity()) - for i := range h.decimalSizes { - // In EstimateBatchSizeBytes, memsize.Decimal has already been - // accounted for for each decimal value, so we multiple that by - // the number of decimal vectors to get already included - // footprint of all decimal values in a single row. - h.decimalSizes[i] = int64(len(h.decimalVecs)) * memsize.Decimal + if cap(h.varSizeDatumSizes) < newBatch.Capacity() { + h.varSizeDatumSizes = make([]int64, newBatch.Capacity()) + } else { + h.varSizeDatumSizes = h.varSizeDatumSizes[:newBatch.Capacity()] } - } - if !h.varLenDatumVecIdxs.Empty() { - h.varLenDatumVecs = h.varLenDatumVecs[:0] - for vecIdx, ok := h.varLenDatumVecIdxs.Next(0); ok; vecIdx, ok = h.varLenDatumVecIdxs.Next(vecIdx + 1) { - h.varLenDatumVecs = append(h.varLenDatumVecs, vecs[vecIdx].Datum()) + for i := range h.varSizeDatumSizes { + h.varSizeDatumSizes[i] = h.varSizeEstimatePerRow } } } @@ -966,25 +966,18 @@ func (h *SetAccountingHelper) AccountForSet(rowIdx int) (batchDone bool) { h.prevBytesLikeTotalSize = newBytesLikeTotalSize } - if !h.decimalVecIdxs.Empty() { - var newDecimalSizes int64 + if !h.varSizeVecIdxs.Empty() { + var newVarLengthDatumSize int64 for _, decimalVec := range h.decimalVecs { d := decimalVec.Get(rowIdx) - newDecimalSizes += int64(d.Size()) + newVarLengthDatumSize += int64(d.Size()) } - h.helper.allocator.AdjustMemoryUsageAfterAllocation(newDecimalSizes - h.decimalSizes[rowIdx]) - h.decimalSizes[rowIdx] = newDecimalSizes - } - - if !h.varLenDatumVecIdxs.Empty() { - var newVarLengthDatumSize int64 - for _, datumVec := range h.varLenDatumVecs { + for _, datumVec := range h.datumVecs { datumSize := datumVec.Get(rowIdx).(tree.Datum).Size() - // Note that we're ignoring the overhead of tree.Datum because it - // was already included in EstimateBatchSizeBytes. - newVarLengthDatumSize += int64(datumSize) + newVarLengthDatumSize += int64(datumSize) + memsize.DatumOverhead } - h.helper.allocator.AdjustMemoryUsageAfterAllocation(newVarLengthDatumSize) + h.helper.allocator.AdjustMemoryUsageAfterAllocation(newVarLengthDatumSize - h.varSizeDatumSizes[rowIdx]) + h.varSizeDatumSizes[rowIdx] = newVarLengthDatumSize } // The allocator is not shared with any other components, so we can just use diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index dadef719683b..59f274db6686 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" + "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/colconv" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" @@ -528,7 +529,18 @@ func TestEstimateBatchSizeBytes(t *testing.T) { numCols := rng.Intn(10) + 1 typs := make([]*types.T, numCols) for i := range typs { - typs[i] = randgen.RandType(rng) + for { + typs[i] = randgen.RandType(rng) + // We ignore all datum-backed types. This is due to mismatch in how + // we account for unset elements in EstimateBatchSizeBytes (where we + // include the estimated implementation size) and datumVec.Size + // (where unset elements remain nil for which we only include the + // DatumOverhead). This exception is ok given that we still perform + // the correct accounting after the actual elements are set. + if typeconv.TypeFamilyToCanonicalTypeFamily(typs[i].Family()) != typeconv.DatumVecCanonicalTypeFamily { + break + } + } } const numRuns = 10 for run := 0; run < numRuns; run++ { diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_agg b/pkg/sql/logictest/testdata/logic_test/vectorize_agg index 2da332fe4820..4b8792212735 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_agg +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_agg @@ -71,3 +71,17 @@ true statement ok RESET testing_optimizer_random_seed; RESET testing_optimizer_disable_rule_probability; + +# Regression test for releasing the memory of datums in the datum-backed vector +# from the incorrect memory account (#97603). +statement ok +CREATE TABLE t97603 (id PRIMARY KEY) AS SELECT generate_series(1, 50000); + +# The important bits are to use an aggregate function that is not supported +# natively in the vectorized engine and to have a projection operator that is +# producing a datum-backed vector (constant OID projection). +statement ok +SELECT + var_pop(crdb_internal_mvcc_timestamp::DECIMAL), + 1:::OID +FROM t97603 GROUP BY id HAVING bool_or(true)