Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hash agg #76421

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -68,8 +69,9 @@ type Batch interface {
// important for callers to call ResetInternalBatch if they own internal
// batches that they reuse as not doing this could result in correctness
// or memory blowup issues. It unsets the selection and sets the length to
// 0.
ResetInternalBatch()
// 0. Notably, it deeply resets the datum-backed vectors and returns the
// number of bytes released as a result of the reset.
ResetInternalBatch() int64
// String returns a pretty representation of this batch.
String() string
}
Expand Down Expand Up @@ -126,6 +128,8 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor
b.b[i] = col
if col.IsBytesLike() {
b.bytesVecIdxs.Add(i)
} else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
b.datumVecIdxs.Add(i)
}
}
return b
Expand Down Expand Up @@ -202,6 +206,8 @@ type MemBatch struct {
// vectors and checking whether they are of Bytes type we store this slice
// separately.
bytesVecIdxs util.FastIntSet
// datumVecIdxs stores the indices of all datum-backed vectors in b.
datumVecIdxs util.FastIntSet
useSel bool
// sel is - if useSel is true - a selection vector from upstream. A
// selection vector is a list of selected tuple indices in this memBatch's
Expand Down Expand Up @@ -271,6 +277,8 @@ func (m *MemBatch) SetLength(length int) {
func (m *MemBatch) AppendCol(col Vec) {
if col.IsBytesLike() {
m.bytesVecIdxs.Add(len(m.b))
} else if col.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
m.datumVecIdxs.Add(len(m.b))
}
m.b = append(m.b, col)
}
Expand Down Expand Up @@ -315,12 +323,17 @@ func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) {
m.bytesVecIdxs.Remove(i)
}
}
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
if i >= len(typs) {
m.datumVecIdxs.Remove(i)
}
}
m.ResetInternalBatch()
m.SetLength(length)
}

// ResetInternalBatch implements the Batch interface.
func (m *MemBatch) ResetInternalBatch() {
func (m *MemBatch) ResetInternalBatch() int64 {
m.SetLength(0 /* length */)
m.SetSelection(false)
for _, v := range m.b {
Expand All @@ -331,6 +344,11 @@ func (m *MemBatch) ResetInternalBatch() {
for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) {
Reset(m.b[i])
}
var released int64
for i, ok := m.datumVecIdxs.Next(0); ok; i, ok = m.datumVecIdxs.Next(i + 1) {
released += m.b[i].Datum().Reset()
}
return released
}

// String returns a pretty representation of this batch.
Expand Down
5 changes: 3 additions & 2 deletions pkg/col/coldata/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type DatumVec interface {
AppendSlice(src DatumVec, destIdx, srcStartIdx, srcEndIdx int)
// AppendVal appends the given tree.Datum value to the end of the vector.
AppendVal(v Datum)
// SetLength sets the length of the vector.
SetLength(l int)
// Len returns the length of the vector.
Len() int
// Cap returns the underlying capacity of the vector.
Expand All @@ -55,4 +53,7 @@ type DatumVec interface {
// be used when elements before startIdx are guaranteed not to have been
// modified.
Size(startIdx int) int64
// Reset resets the vector for reuse. It returns the number of bytes
// released.
Reset() int64
}
51 changes: 29 additions & 22 deletions pkg/col/coldataext/datum_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ func (dv *datumVec) AppendVal(v coldata.Datum) {
dv.data = append(dv.data, datum)
}

// SetLength implements coldata.DatumVec interface.
func (dv *datumVec) SetLength(l int) {
dv.data = dv.data[:l]
}

// Len implements coldata.DatumVec interface.
func (dv *datumVec) Len() int {
return len(dv.data)
Expand All @@ -145,6 +140,24 @@ func (dv *datumVec) UnmarshalTo(i int, b []byte) error {
return err
}

// valuesSize returns the footprint of actual datums (in bytes) with ordinals in
// [startIdx:] range, ignoring the overhead of tree.Datum wrapper.
func (dv *datumVec) valuesSize(startIdx int) int64 {
var size int64
// Only the elements up to the length are expected to be non-nil. Note that
// we cannot take a short-cut with fixed-length values here because they
// might not be set, so we could over-account if we did something like
// size += (len-startIdx) * fixedSize.
if startIdx < dv.Len() {
for _, d := range dv.data[startIdx:dv.Len()] {
if d != nil {
size += int64(d.Size())
}
}
}
return size
}

// Size implements coldata.DatumVec interface.
func (dv *datumVec) Size(startIdx int) int64 {
// Note that we don't account for the overhead of datumVec struct, and the
Expand All @@ -156,24 +169,18 @@ func (dv *datumVec) Size(startIdx int) int64 {
if startIdx < 0 {
startIdx = 0
}
count := int64(dv.Cap() - startIdx)
size := memsize.DatumOverhead * count
if datumSize, variable := tree.DatumTypeSize(dv.t); variable {
// The elements in dv.data[max(startIdx,len):cap] range are accounted with
// the default datum size for the type. For those in the range
// [startIdx, len) we call Datum.Size().
idx := startIdx
for ; idx < len(dv.data); idx++ {
if dv.data[idx] != nil {
size += int64(dv.data[idx].Size())
}
}
// Pick up where the loop left off.
size += int64(dv.Cap()-idx) * int64(datumSize)
} else {
size += int64(datumSize) * count
// We have to account for the tree.Datum overhead for the whole capacity of
// the underlying slice.
return memsize.DatumOverhead*int64(dv.Cap()-startIdx) + dv.valuesSize(startIdx)
}

// Reset implements coldata.DatumVec interface.
func (dv *datumVec) Reset() int64 {
released := dv.valuesSize(0 /* startIdx */)
for i := range dv.data {
dv.data[i] = nil
}
return size
return released
}

// assertValidDatum asserts that the given datum is valid to be stored in this
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ func (b *AppendOnlyBufferedBatch) Reset([]*types.T, int, coldata.ColumnFactory)
}

// ResetInternalBatch implements the coldata.Batch interface.
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() {
func (b *AppendOnlyBufferedBatch) ResetInternalBatch() int64 {
b.SetLength(0 /* n */)
b.batch.ResetInternalBatch()
return b.batch.ResetInternalBatch()
}

// String implements the coldata.Batch interface.
Expand Down
20 changes: 6 additions & 14 deletions pkg/sql/colexec/hash_aggregator.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ type hashAggregator struct {
curOutputBucketIdx int

maxOutputBatchMemSize int64
// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when, after setting a row, we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
output coldata.Batch
output coldata.Batch

aggFnsAlloc *colexecagg.AggregateFuncsAlloc
hashAlloc aggBucketAlloc
Expand Down
10 changes: 3 additions & 7 deletions pkg/sql/colexec/hash_aggregator_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,20 +345,16 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch {
op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize,
)
curOutputIdx := 0
for curOutputIdx < op.output.Capacity() &&
op.curOutputBucketIdx < len(op.buckets) &&
(op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) {
var batchFull bool
for op.curOutputBucketIdx < len(op.buckets) && !batchFull {
bucket := op.buckets[op.curOutputBucketIdx]
for fnIdx, fn := range bucket.fns {
fn.SetOutput(op.output.ColVec(fnIdx))
fn.Flush(curOutputIdx)
}
op.accountingHelper.AccountForSet(curOutputIdx)
batchFull = op.accountingHelper.AccountForSet(curOutputIdx)
curOutputIdx++
op.curOutputBucketIdx++
if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize {
op.maxCapacity = curOutputIdx
}
}
if op.curOutputBucketIdx >= len(op.buckets) {
if partialOrder {
Expand Down
11 changes: 2 additions & 9 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ type OrderedSynchronizer struct {
heap []int
// comparators stores one comparator per ordering column.
comparators []vecComparator
// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when, after setting a row, we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
output coldata.Batch
outVecs coldata.TypedVecs
}
Expand Down Expand Up @@ -135,7 +131,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
}
o.resetOutput()
outputIdx := 0
for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) {
for batchFull := false; !batchFull; {
if o.Len() == 0 {
// All inputs exhausted.
break
Expand Down Expand Up @@ -187,11 +183,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
}

// Account for the memory of the row we have just set.
o.accountingHelper.AccountForSet(outputIdx)
batchFull = o.accountingHelper.AccountForSet(outputIdx)
outputIdx++
if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit {
o.maxCapacity = outputIdx
}
}

o.output.SetLength(outputIdx)
Expand Down
30 changes: 6 additions & 24 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,21 +287,12 @@ type cFetcher struct {
// kvFetcherMemAcc is a memory account that will be used by the underlying
// KV fetcher.
kvFetcherMemAcc *mon.BoundAccount

// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when at the row finalization we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
}

func (cf *cFetcher) resetBatch() {
var reallocated bool
var minDesiredCapacity int
if cf.maxCapacity > 0 {
// If we have already exceeded the memory limit for the output batch, we
// will only be using the same batch from now on.
minDesiredCapacity = cf.maxCapacity
} else if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) {
if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) {
// If we have a limit hint, and either
// 1) we don't have an estimate, or
// 2) we have a soft limit,
Expand Down Expand Up @@ -912,23 +903,14 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// column is requested) yet, but it is ok for the purposes of the
// memory accounting - oids are fixed length values and, thus, have
// already been accounted for when the batch was allocated.
cf.accountingHelper.AccountForSet(cf.machine.rowIdx)
emitBatch := cf.accountingHelper.AccountForSet(cf.machine.rowIdx)
cf.machine.rowIdx++
cf.shiftState()

var emitBatch bool
if cf.maxCapacity == 0 && cf.accountingHelper.Allocator.Used() >= cf.memoryLimit {
cf.maxCapacity = cf.machine.rowIdx
}
if cf.machine.rowIdx >= cf.machine.batch.Capacity() ||
(cf.maxCapacity > 0 && cf.machine.rowIdx >= cf.maxCapacity) ||
(cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint) {
// We either
// 1. have no more room in our batch, so output it immediately
// or
// 2. we made it to our limit hint, so output our batch early
// to make sure that we don't bother filling in extra data
// if we don't need to.
if cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint {
// If we made it to our limit hint, so output our batch early to
// make sure that we don't bother filling in extra data if we
// don't need to.
emitBatch = true
// Update the limit hint to track the expected remaining rows to
// be fetched.
Expand Down
Loading