Skip to content

Commit

Permalink
colmem: improve the behavior of ResetMaybeReallocate
Browse files Browse the repository at this point in the history
Previously, `Allocator.ResetMaybeReallocate` would always grow the
capacity of the batch until `coldata.BatchSize()` (unless the memory
limit has been exceeded). This behavior allows us to have batches with
dynamic size when we don't know how many rows we need to process (for
example, in the `cFetcher` we start out with 1 row, then grow it
exponentially - 2, 4, 8, etc). However, in some cases we know exactly
how many rows we want to include into the batch, so that behavior can
result in us re-allocating a batch needlessly when the old batch already
have enough capacity.

This commit improves the situation by adding a knob to indicate that if
the desired capacity is satisfied by the old batch, then it should not
be re-allocated. All callers have been audited accordingly.

Release note: None
  • Loading branch information
yuzefovich committed May 19, 2022
1 parent fb8e09a commit cf68258
Show file tree
Hide file tree
Showing 28 changed files with 73 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (c *crossJoiner) Next() coldata.Batch {
}
c.output, _ = c.unlimitedAllocator.ResetMaybeReallocate(
c.outputTypes, c.output, willEmit, c.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if willEmit > c.output.Capacity() {
willEmit = c.output.Capacity()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ func (hj *hashJoiner) resetOutput(nResults int) {
const maxOutputBatchMemSize = math.MaxInt64
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(
hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch {
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(
o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
o.outputCapacity = o.output.Capacity()
o.bufferedGroup.helper.output = o.output
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (p *deselectorOp) Next() coldata.Batch {
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
sel := batch.Selection()
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecutils/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
const maxBatchMemSize = math.MaxInt64
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate(
q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() {
for i := range q.typs {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
// attention to the memory registered with the unlimited allocator, and
// we will stop adding tuples into this batch and spill when needed.
math.MaxInt64, /* maxBatchMemSize */
true, /* desiredCapacitySufficient */
)
q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() {
for i := range q.typs {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
const maxBatchMemSize = math.MaxInt64
b.currentBatch, _ = b.allocator.ResetMaybeReallocate(
b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() {
for colIdx, vec := range batch.ColVecs() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (c *Columnarizer) Next() coldata.Batch {
switch c.mode {
case columnarizerBufferingMode:
c.batch, reallocated = c.allocator.ResetMaybeReallocate(
c.typs, c.batch, 1 /* minDesiredCapacity */, c.maxBatchMemSize,
c.typs, c.batch, 1, /* minDesiredCapacity */
c.maxBatchMemSize, false, /* desiredCapacitySufficient */
)
case columnarizerStreamingMode:
// Note that we're not using ResetMaybeReallocate because we will
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/hash_aggregator.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/hash_aggregator_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch {
// len(op.buckets) capacity.
op.output, _ = op.accountingHelper.ResetMaybeReallocate(
op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
curOutputIdx := 0
for curOutputIdx < op.output.Capacity() &&
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ func (a *orderedAggregator) Next() coldata.Batch {
a.scratch.Batch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, newMinCapacity)
} else {
a.scratch.Batch, _ = a.allocator.ResetMaybeReallocate(
a.outputTypes, a.scratch.Batch, newMinCapacity, maxBatchMemSize,
a.outputTypes, a.scratch.Batch, newMinCapacity,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
}
// We will never copy more than coldata.BatchSize() into the
// temporary buffer, so a half of the scratch's capacity will always
// be sufficient.
a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocate(
a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2, maxBatchMemSize,
a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
for fnIdx, fn := range a.bucket.fns {
fn.SetOutput(a.scratch.ColVec(fnIdx))
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (o *OrderedSynchronizer) resetOutput() {
var reallocated bool
o.output, reallocated = o.accountingHelper.ResetMaybeReallocate(
o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
if reallocated {
o.outVecs.SetBatch(o.output)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ func (p *sortOp) Next() coldata.Batch {
p.state = sortDone
continue
}
p.output, _ = p.allocator.ResetMaybeReallocate(p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize)
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if toEmit > p.output.Capacity() {
toEmit = p.output.Capacity()
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func (t *topKSorter) emit() coldata.Batch {
// We're done.
return coldata.ZeroBatch
}
t.output, _ = t.allocator.ResetMaybeReallocate(t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize)
t.output, _ = t.allocator.ResetMaybeReallocate(
t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if toEmit > t.output.Capacity() {
toEmit = t.output.Capacity()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (cf *cFetcher) resetBatch() {
}
cf.machine.batch, reallocated = cf.accountingHelper.ResetMaybeReallocate(
cf.table.typs, cf.machine.batch, minDesiredCapacity, cf.memoryLimit,
false, /* desiredCapacitySufficient */
)
if reallocated {
cf.machine.colvecs.SetBatch(cf.machine.batch)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,10 @@ func (i *Inbox) Next() coldata.Batch {
}
// We rely on the outboxes to produce reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(i.typs, i.scratch.b, batchLength, maxBatchMemSize)
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(
i.typs, i.scratch.b, batchLength, maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() {
if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil {
colexecerror.InternalError(err)
Expand Down
24 changes: 20 additions & 4 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,22 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
//
// The method will grow the allocated capacity of the batch exponentially
// (possibly incurring a reallocation), until the batch reaches
// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint.
// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint if
// desiredCapacitySufficient is false. When that parameter is true and the
// capacity of old batch is at least minDesiredCapacity, then the old batch is
// reused.
//
// NOTE: if the reallocation occurs, then the memory under the old batch is
// released, so it is expected that the caller will lose the references to the
// old batch.
// Note: the method assumes that minDesiredCapacity is at least 0 and will clamp
// minDesiredCapacity to be between 1 and coldata.BatchSize() inclusive.
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minDesiredCapacity int, maxBatchMemSize int64,
typs []*types.T,
oldBatch coldata.Batch,
minDesiredCapacity int,
maxBatchMemSize int64,
desiredCapacitySufficient bool,
) (newBatch coldata.Batch, reallocated bool) {
if minDesiredCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minDesiredCapacity %d", minDesiredCapacity))
Expand All @@ -179,6 +186,11 @@ func (a *Allocator) ResetMaybeReallocate(
} else {
// If old batch is already of the largest capacity, we will reuse it.
useOldBatch := oldBatch.Capacity() == coldata.BatchSize()
// If the old batch already satisfies the desired capacity which is
// sufficient, we will reuse it too.
if desiredCapacitySufficient && oldBatch.Capacity() >= minDesiredCapacity {
useOldBatch = true
}
// Avoid calculating the memory footprint if possible.
var oldBatchMemSize int64
if !useOldBatch {
Expand Down Expand Up @@ -568,10 +580,14 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
// Allocator.ResetMaybeReallocate (and thus has the same contract) with an
// additional logic for memory tracking purposes.
func (h *SetAccountingHelper) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
typs []*types.T,
oldBatch coldata.Batch,
minCapacity int,
maxBatchMemSize int64,
desiredCapacitySufficient bool,
) (newBatch coldata.Batch, reallocated bool) {
newBatch, reallocated = h.Allocator.ResetMaybeReallocate(
typs, oldBatch, minCapacity, maxBatchMemSize,
typs, oldBatch, minCapacity, maxBatchMemSize, desiredCapacitySufficient,
)
if reallocated && !h.allFixedLength {
// Allocator.ResetMaybeReallocate has released the precise memory
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/colmem/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func TestResetMaybeReallocate(t *testing.T) {
typs := []*types.T{types.Bytes}

// Allocate a new batch and modify it.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */)
b.SetSelection(true)
b.Selection()[0] = 1
b.ColVec(0).Bytes().Set(1, []byte("foo"))

oldBatch := b
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */)
// We should have used the same batch, and now it should be in a "reset"
// state.
require.Equal(t, oldBatch, b)
Expand All @@ -152,15 +152,21 @@ func TestResetMaybeReallocate(t *testing.T) {
// Allocate a new batch attempting to use the batch with too small of a
// capacity - new batch should **not** be allocated because the memory
// limit is already exceeded.
b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize, false /* desiredCapacitySufficient */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

oldBatch := b

// Reset the batch asking for the same small desired capacity when it is
// sufficient - the same batch should be returned.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity/2, smallMemSize, true /* desiredCapacitySufficient */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

// Reset the batch and confirm that a new batch is allocated because we
// have given larger memory limit.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, minDesiredCapacity, b.Capacity())

Expand All @@ -171,7 +177,7 @@ func TestResetMaybeReallocate(t *testing.T) {
// ResetMaybeReallocate truncates the capacity at
// coldata.BatchSize(), so we run this part of the test only when
// doubled capacity will not be truncated.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, 2*minDesiredCapacity, b.Capacity())
}
Expand Down Expand Up @@ -300,7 +306,7 @@ func TestSetAccountingHelper(t *testing.T) {
// new batch with larger capacity might be allocated.
maxBatchMemSize = largeMemSize
}
batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize)
batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize, false /* desiredCapacitySufficient */)

for rowIdx := 0; rowIdx < batch.Capacity(); rowIdx++ {
for vecIdx, typ := range typs {
Expand Down

0 comments on commit cf68258

Please sign in to comment.