Skip to content

Commit

Permalink
colmem: move some logic of capacity-limiting into the accounting helper
Browse files Browse the repository at this point in the history
This commit moves the logic that was duplicated across each user of the
SetAccountingHelper into the helper itself. Clearly, this allows us to
de-duplicate some code, but it'll make it easier to refactor the code
which is done in the following commit.

Additionally, this commit makes a tiny change to make the resetting
behavior in the hash aggregator more precise.

Release note: None
  • Loading branch information
yuzefovich committed Aug 10, 2022
1 parent aec1290 commit 41fa8b6
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 124 deletions.
30 changes: 10 additions & 20 deletions pkg/sql/colexec/hash_aggregator.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 12 additions & 18 deletions pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,7 @@ type hashAggregator struct {
// populating the output.
curOutputBucketIdx int

maxOutputBatchMemSize int64
// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when, after setting a row, we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
output coldata.Batch
output coldata.Batch

aggFnsAlloc *colexecagg.AggregateFuncsAlloc
hashAlloc aggBucketAlloc
Expand Down Expand Up @@ -212,19 +207,18 @@ func NewHashAggregator(
colexecerror.InternalError(err)
}
hashAgg := &hashAggregator{
OneInputNode: colexecop.NewOneInputNode(args.Input),
hashTableAllocator: hashTableAllocator,
spec: args.Spec,
state: hashAggregatorBuffering,
inputTypes: args.InputTypes,
outputTypes: args.OutputTypes,
inputArgsConverter: inputArgsConverter,
toClose: toClose,
maxOutputBatchMemSize: maxOutputBatchMemSize,
aggFnsAlloc: aggFnsAlloc,
hashAlloc: aggBucketAlloc{allocator: args.Allocator},
OneInputNode: colexecop.NewOneInputNode(args.Input),
hashTableAllocator: hashTableAllocator,
spec: args.Spec,
state: hashAggregatorBuffering,
inputTypes: args.InputTypes,
outputTypes: args.OutputTypes,
inputArgsConverter: inputArgsConverter,
toClose: toClose,
aggFnsAlloc: aggFnsAlloc,
hashAlloc: aggBucketAlloc{allocator: args.Allocator},
}
hashAgg.accountingHelper.Init(outputUnlimitedAllocator, args.OutputTypes)
hashAgg.accountingHelper.Init(outputUnlimitedAllocator, maxOutputBatchMemSize, args.OutputTypes)
hashAgg.bufferingState.tuples = colexecutils.NewAppendOnlyBufferedBatch(args.Allocator, args.InputTypes, nil /* colsToStore */)
hashAgg.datumAlloc.AllocSize = hashAggregatorAllocSize
hashAgg.aggHelper = newAggregatorHelper(args, &hashAgg.datumAlloc, true /* isHashAgg */, hashAggregatorMaxBuffered)
Expand Down
15 changes: 5 additions & 10 deletions pkg/sql/colexec/hash_aggregator_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,26 +342,21 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch {
case hashAggregatorOutputting:
// Note that ResetMaybeReallocate truncates the requested capacity
// at coldata.BatchSize(), so we can just try asking for
// len(op.buckets) capacity.
// len(op.buckets)-op.curOutputBucketIdx (the number of remaining
// output tuples) capacity.
op.output, _ = op.accountingHelper.ResetMaybeReallocate(
op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx, true, /* desiredCapacitySufficient */
)
curOutputIdx := 0
for curOutputIdx < op.output.Capacity() &&
op.curOutputBucketIdx < len(op.buckets) &&
(op.maxCapacity == 0 || curOutputIdx < op.maxCapacity) {
for batchDone := false; op.curOutputBucketIdx < len(op.buckets) && !batchDone; {
bucket := op.buckets[op.curOutputBucketIdx]
for fnIdx, fn := range bucket.fns {
fn.SetOutput(op.output.ColVec(fnIdx))
fn.Flush(curOutputIdx)
}
op.accountingHelper.AccountForSet(curOutputIdx)
batchDone = op.accountingHelper.AccountForSet(curOutputIdx)
curOutputIdx++
op.curOutputBucketIdx++
if op.maxCapacity == 0 && op.accountingHelper.Allocator.Used() >= op.maxOutputBatchMemSize {
op.maxCapacity = curOutputIdx
}
}
if op.curOutputBucketIdx >= len(op.buckets) {
if partialOrder {
Expand Down
18 changes: 4 additions & 14 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 4 additions & 14 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type OrderedSynchronizer struct {
span *tracing.Span

accountingHelper colmem.SetAccountingHelper
memoryLimit int64
inputs []colexecargs.OpWithMetaInfo
ordering colinfo.ColumnOrdering
typs []*types.T
Expand All @@ -81,10 +80,6 @@ type OrderedSynchronizer struct {
heap []int
// comparators stores one comparator per ordering column.
comparators []vecComparator
// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when, after setting a row, we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
output coldata.Batch
outVecs coldata.TypedVecs
}
Expand Down Expand Up @@ -114,13 +109,12 @@ func NewOrderedSynchronizer(
ordering colinfo.ColumnOrdering,
) *OrderedSynchronizer {
os := &OrderedSynchronizer{
memoryLimit: memoryLimit,
inputs: inputs,
ordering: ordering,
typs: typs,
canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs),
}
os.accountingHelper.Init(allocator, typs)
os.accountingHelper.Init(allocator, memoryLimit, typs)
return os
}

Expand All @@ -140,7 +134,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
}
o.resetOutput()
outputIdx := 0
for outputIdx < o.output.Capacity() && (o.maxCapacity == 0 || outputIdx < o.maxCapacity) {
for batchDone := false; !batchDone; {
if o.advanceMinBatch {
// Advance the minimum input batch, fetching a new batch if
// necessary.
Expand Down Expand Up @@ -205,11 +199,8 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
o.advanceMinBatch = true

// Account for the memory of the row we have just set.
o.accountingHelper.AccountForSet(outputIdx)
batchDone = o.accountingHelper.AccountForSet(outputIdx)
outputIdx++
if o.maxCapacity == 0 && o.accountingHelper.Allocator.Used() >= o.memoryLimit {
o.maxCapacity = outputIdx
}
}

o.output.SetLength(outputIdx)
Expand All @@ -219,8 +210,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
func (o *OrderedSynchronizer) resetOutput() {
var reallocated bool
o.output, reallocated = o.accountingHelper.ResetMaybeReallocate(
o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
o.typs, o.output, 1 /* minDesiredCapacity */, false, /* desiredCapacitySufficient */
)
if reallocated {
o.outVecs.SetBatch(o.output)
Expand Down
35 changes: 8 additions & 27 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,12 @@ type cFetcher struct {
scratch []byte

accountingHelper colmem.SetAccountingHelper

// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when at the row finalization we realize that the output
// batch has exceeded the memory limit.
maxCapacity int
}

func (cf *cFetcher) resetBatch() {
var reallocated bool
var minDesiredCapacity int
if cf.maxCapacity > 0 {
// If we have already exceeded the memory limit for the output batch, we
// will only be using the same batch from now on.
minDesiredCapacity = cf.maxCapacity
} else if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) {
if cf.machine.limitHint > 0 && (cf.estimatedRowCount == 0 || uint64(cf.machine.limitHint) < cf.estimatedRowCount) {
// If we have a limit hint, and either
// 1) we don't have an estimate, or
// 2) we have a soft limit,
Expand All @@ -309,8 +300,7 @@ func (cf *cFetcher) resetBatch() {
}
}
cf.machine.batch, reallocated = cf.accountingHelper.ResetMaybeReallocate(
cf.table.typs, cf.machine.batch, minDesiredCapacity, cf.memoryLimit,
false, /* desiredCapacitySufficient */
cf.table.typs, cf.machine.batch, minDesiredCapacity, false, /* desiredCapacitySufficient */
)
if reallocated {
cf.machine.colvecs.SetBatch(cf.machine.batch)
Expand Down Expand Up @@ -465,7 +455,7 @@ func (cf *cFetcher) Init(

cf.table = table
cf.fetcher = kvFetcher
cf.accountingHelper.Init(allocator, cf.table.typs)
cf.accountingHelper.Init(allocator, cf.memoryLimit, cf.table.typs)

return nil
}
Expand Down Expand Up @@ -854,23 +844,14 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// column is requested) yet, but it is ok for the purposes of the
// memory accounting - oids are fixed length values and, thus, have
// already been accounted for when the batch was allocated.
cf.accountingHelper.AccountForSet(cf.machine.rowIdx)
emitBatch := cf.accountingHelper.AccountForSet(cf.machine.rowIdx)
cf.machine.rowIdx++
cf.shiftState()

var emitBatch bool
if cf.maxCapacity == 0 && cf.accountingHelper.Allocator.Used() >= cf.memoryLimit {
cf.maxCapacity = cf.machine.rowIdx
}
if cf.machine.rowIdx >= cf.machine.batch.Capacity() ||
(cf.maxCapacity > 0 && cf.machine.rowIdx >= cf.maxCapacity) ||
(cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint) {
// We either
// 1. have no more room in our batch, so output it immediately
// or
// 2. we made it to our limit hint, so output our batch early
// to make sure that we don't bother filling in extra data
// if we don't need to.
if cf.machine.limitHint > 0 && cf.machine.rowIdx >= cf.machine.limitHint {
// If we made it to our limit hint, so output our batch early to
// make sure that we don't bother filling in extra data if we
// don't need to.
emitBatch = true
// Update the limit hint to track the expected remaining rows to
// be fetched.
Expand Down
Loading

0 comments on commit 41fa8b6

Please sign in to comment.