Skip to content

Commit

Permalink
Merge #97750
Browse files Browse the repository at this point in the history
97750: colexec: fix incorrect accounting when resetting datum-backed vectors r=yuzefovich a=yuzefovich

This commit reverts a couple of other commits:
- "colexec: fix a "fake" memory accounting leak for intra-query period"
(72e83fe)
- "colexec: deeply reset datum-backed vectors in ResetInternalBatch"
(cb93c30)

since they introduced incorrect memory accounting for the datum-backed
vectors.

Those two commits together solved another issue where we would keep
no-longer-needed datums live for longer than necessary (until they are
overwritten in the datum-backed vector) by eagerly nil-ing them out when
resetting the whole batch. This required introducing some careful
adjustment to the memory accounting in order to keep the accounting up
to date. However, that logic turned out to be faulty; in particular, it
became possible to register the allocations of the datum-backed vectors
with one account but then attempt to release some of those allocations
from another. If those releases happen enough times, it'd put the
account in debt which would trigger an internal error (or a crash in
test builds).

Such a scenario can occur because we have a couple of utility operators
that append a vector to a batch owned by another operator. When that
other operator resets its batch, the appended-by-utility-operator
vector is also reset, and the memory usage of the freed datum would be
deregistered from the wrong account. Tracking precisely which vector is
owned by the owner of the batch vs appended by another operator can be
cumbersome and error-prone, so this commit instead of introducing this
tracking removes the resetting behavior of the datum-backed vectors.
This should be bullet-proof while only increasing slightly the amount of
time references to datums are kept live.

Fixes: #97603.

Release note (bug fix): CockroachDB could previously encounter an
internal error "no bytes in account to release ..." in rare cases and
this is now fixed. The bug was introduced in 22.1.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 28, 2023
2 parents 528cd19 + f2dd52c commit 5f34f44
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 182 deletions.
1 change: 0 additions & 1 deletion pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/duration",
"//pkg/util/intsets",
"//pkg/util/json",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
Expand Down
33 changes: 4 additions & 29 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -73,12 +71,7 @@ type Batch interface {
// batches that they reuse as not doing this could result in correctness
// or memory blowup issues. It unsets the selection and sets the length to
// 0.
//
// Notably, it deeply resets the datum-backed vectors and returns the number
// of bytes released as a result of the reset. Callers should update the
// allocator (which the batch was instantiated from) accordingly unless they
// guarantee that the batch doesn't have any datum-backed vectors.
ResetInternalBatch() int64
ResetInternalBatch()
// String returns a pretty representation of this batch.
String() string
}
Expand Down Expand Up @@ -136,9 +129,6 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor
col := &cols[i]
col.init(t, capacity, factory)
b.b[i] = col
if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
b.datumVecIdxs.Add(i)
}
}
return b
}
Expand Down Expand Up @@ -208,10 +198,8 @@ type MemBatch struct {
// MemBatch.
capacity int
// b is the slice of columns in this batch.
b []Vec
// datumVecIdxs stores the indices of all datum-backed vectors in b.
datumVecIdxs intsets.Fast
useSel bool
b []Vec
useSel bool
// sel is - if useSel is true - a selection vector from upstream. A
// selection vector is a list of selected tuple indices in this memBatch's
// columns (tuples for which indices are not in sel are considered to be
Expand Down Expand Up @@ -264,9 +252,6 @@ func (m *MemBatch) SetLength(length int) {

// AppendCol implements the Batch interface.
func (m *MemBatch) AppendCol(col Vec) {
if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
m.datumVecIdxs.Add(len(m.b))
}
m.b = append(m.b, col)
}

Expand Down Expand Up @@ -305,17 +290,12 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) {
// since those will get reset in ResetInternalBatch anyway.
m.b = m.b[:len(typs)]
m.sel = m.sel[:length]
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
if i >= len(typs) {
m.datumVecIdxs.Remove(i)
}
}
m.ResetInternalBatch()
m.SetLength(length)
}

// ResetInternalBatch implements the Batch interface.
func (m *MemBatch) ResetInternalBatch() int64 {
func (m *MemBatch) ResetInternalBatch() {
m.SetLength(0 /* length */)
m.SetSelection(false)
for _, v := range m.b {
Expand All @@ -324,11 +304,6 @@ func (m *MemBatch) ResetInternalBatch() int64 {
ResetIfBytesLike(v)
}
}
var released int64
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
released += m.b[i].Datum().Reset()
}
return released
}

// String returns a pretty representation of this batch.
Expand Down
5 changes: 2 additions & 3 deletions pkg/col/coldata/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type DatumVec interface {
AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int)
// AppendVal appends the given tree.Datum value to the end of the vector.
AppendVal(v Datum)
// SetLength sets the length of the vector.
SetLength(l int)
// Len returns the length of the vector.
Len() int
// Cap returns the underlying capacity of the vector.
Expand All @@ -53,7 +55,4 @@ type DatumVec interface {
// be used when elements before startIdx are guaranteed not to have been
// modified.
Size(startIdx int) int64
// Reset resets the vector for reuse. It returns the number of bytes
// released.
Reset() int64
}
51 changes: 22 additions & 29 deletions pkg/col/coldataext/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func (dv *datumVec) AppendVal(v coldata.Datum) {
dv.data = append(dv.data, datum)
}

// SetLength implements coldata.DatumVec interface.
func (dv *datumVec) SetLength(l int) {
dv.data = dv.data[:l]
}

// Len implements coldata.DatumVec interface.
func (dv *datumVec) Len() int {
return len(dv.data)
Expand All @@ -158,24 +163,6 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error {
return err
}

// valuesSize returns the footprint of actual datums (in bytes) with ordinals in
// [startIdx:] range, ignoring the overhead of tree.Datum wrapper.
func (dv *datumVec) valuesSize(startIdx int) int64 {
var size int64
// Only the elements up to the length are expected to be non-nil. Note that
// we cannot take a short-cut with fixed-length values here because they
// might not be set, so we could over-account if we did something like
// size += (len-startIdx) * fixedSize.
if startIdx < dv.Len() {
for _, d := range dv.data[startIdx:dv.Len()] {
if d != nil {
size += int64(d.Size())
}
}
}
return size
}

// Size implements coldata.DatumVec interface.
func (dv *datumVec) Size(startIdx int) int64 {
// Note that we don't account for the overhead of datumVec struct, and the
Expand All @@ -187,18 +174,24 @@ func (dv *datumVec) Size(startIdx int) int64 {
if startIdx < 0 {
startIdx = 0
}
// We have to account for the tree.Datum overhead for the whole capacity of
// the underlying slice.
return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx)
}

// Reset implements coldata.DatumVec interface.
func (dv *datumVec) Reset() int64 {
released := dv.valuesSize(0 /* startIdx */)
for i := range dv.data {
dv.data[i] = nil
count := int64(dv.Cap() - startIdx)
size := memsize.DatumOverhead * count
if datumSize, variable := tree.DatumTypeSize(dv.t); variable {
// The elements in dv.data[max(startIdx,len):cap] range are accounted with
// the default datum size for the type. For those in the range
// [startIdx, len) we call Datum.Size().
idx := startIdx
for ; idx < len(dv.data); idx++ {
if dv.data[idx] != nil {
size += int64(dv.data[idx].Size())
}
}
// Pick up where the loop left off.
size += int64(dv.Cap()-idx) * int64(datumSize)
} else {
size += int64(datumSize) * count
}
return released
return size
}

// assertValidDatum asserts that the given datum is valid to be stored in this
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecdisk/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (op *hashBasedPartitioner) partitionBatch(
for idx, sel := range selections {
partitionIdx := op.partitionIdxOffset + idx
if len(sel) > 0 {
op.unlimitedAllocator.ResetBatch(scratchBatch)
scratchBatch.ResetInternalBatch()
// The partitioner expects the batches without a selection vector,
// so we need to copy the tuples according to the selection vector
// into a scratch batch.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/colexecutils/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ func (s *fixedNumTuplesNoInputOp) Next() coldata.Batch {
if s.numTuplesLeft == 0 {
return coldata.ZeroBatch
}
// The internal batch has no columns, so no memory is ever released on the
// ResetInternalBatch() call.
_ = s.batch.ResetInternalBatch()
s.batch.ResetInternalBatch()
length := s.numTuplesLeft
if length > coldata.BatchSize() {
length = coldata.BatchSize()
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory)
}

// ResetInternalBatch implements the coldata.Batch interface.
// NB: any memory released during this call is automatically released from the
// allocator that created the batch.
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 {
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() {
b.SetLength(0 /* n */)
b.allocator.ReleaseMemory(b.batch.ResetInternalBatch())
return 0
b.batch.ResetInternalBatch()
}

// String implements the coldata.Batch interface.
Expand Down
40 changes: 12 additions & 28 deletions pkg/sql/colexec/colexecwindow/relative_rank.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}}
r.partitionsState.runningSizes.SetLength(coldata.BatchSize())
r.partitionsState.Enqueue(r.Ctx, r.partitionsState.runningSizes)
r.partitionsState.idx = 0
// This batch has only a single INT column, so no memory is ever
// released on the ResetInternalBatch() call.
_ = r.partitionsState.runningSizes.ResetInternalBatch()
r.partitionsState.runningSizes.ResetInternalBatch()
}
}
}
Expand Down Expand Up @@ -190,9 +188,7 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}}
r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize())
r.peerGroupsState.Enqueue(r.Ctx, r.peerGroupsState.runningSizes)
r.peerGroupsState.idx = 0
// This batch has only a single INT column, so no memory is ever
// released on the ResetInternalBatch() call.
_ = r.peerGroupsState.runningSizes.ResetInternalBatch()
r.peerGroupsState.runningSizes.ResetInternalBatch()
}
}
}
Expand Down Expand Up @@ -483,7 +479,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch {
}
// {{end}}

r.allocator.ResetBatch(r.output)
r.output.ResetInternalBatch()
// First, we copy over the buffered up columns.
r.allocator.PerformOperation(r.output.ColVecs()[:len(r.inputTypes)], func() {
for colIdx, vec := range r.output.ColVecs()[:len(r.inputTypes)] {
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func (c *countOp) Next() coldata.Batch {
if c.done {
return coldata.ZeroBatch
}
// The internal batch has only a single INT column, so no memory is ever
// released on the ResetInternalBatch() call.
_ = c.internalBatch.ResetInternalBatch()
c.internalBatch.ResetInternalBatch()
for {
bat := c.Input.Next()
length := bat.Length()
Expand Down
Loading

0 comments on commit 5f34f44

Please sign in to comment.