Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colmem: improve the behavior of ResetMaybeReallocate #81535

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (c *crossJoiner) Next() coldata.Batch {
}
c.output, _ = c.unlimitedAllocator.ResetMaybeReallocate(
c.outputTypes, c.output, willEmit, c.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if willEmit > c.output.Capacity() {
willEmit = c.output.Capacity()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ func (hj *hashJoiner) resetOutput(nResults int) {
const maxOutputBatchMemSize = math.MaxInt64
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(
hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch {
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(
o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
o.outputCapacity = o.output.Capacity()
o.bufferedGroup.helper.output = o.output
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (p *deselectorOp) Next() coldata.Batch {
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
sel := batch.Selection()
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecutils/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
const maxBatchMemSize = math.MaxInt64
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate(
q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() {
for i := range q.typs {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
// attention to the memory registered with the unlimited allocator, and
// we will stop adding tuples into this batch and spill when needed.
math.MaxInt64, /* maxBatchMemSize */
true, /* desiredCapacitySufficient */
)
q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() {
for i := range q.typs {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
const maxBatchMemSize = math.MaxInt64
b.currentBatch, _ = b.allocator.ResetMaybeReallocate(
b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() {
for colIdx, vec := range batch.ColVecs() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (c *Columnarizer) Next() coldata.Batch {
switch c.mode {
case columnarizerBufferingMode:
c.batch, reallocated = c.allocator.ResetMaybeReallocate(
c.typs, c.batch, 1 /* minDesiredCapacity */, c.maxBatchMemSize,
c.typs, c.batch, 1, /* minDesiredCapacity */
c.maxBatchMemSize, false, /* desiredCapacitySufficient */
)
case columnarizerStreamingMode:
// Note that we're not using ResetMaybeReallocate because we will
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/hash_aggregator.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/hash_aggregator_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch {
// len(op.buckets) capacity.
op.output, _ = op.accountingHelper.ResetMaybeReallocate(
op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
curOutputIdx := 0
for curOutputIdx < op.output.Capacity() &&
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ func (a *orderedAggregator) Next() coldata.Batch {
a.scratch.Batch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, newMinCapacity)
} else {
a.scratch.Batch, _ = a.allocator.ResetMaybeReallocate(
a.outputTypes, a.scratch.Batch, newMinCapacity, maxBatchMemSize,
a.outputTypes, a.scratch.Batch, newMinCapacity,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
}
// We will never copy more than coldata.BatchSize() into the
// temporary buffer, so a half of the scratch's capacity will always
// be sufficient.
a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocate(
a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2, maxBatchMemSize,
a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
for fnIdx, fn := range a.bucket.fns {
fn.SetOutput(a.scratch.ColVec(fnIdx))
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (o *OrderedSynchronizer) resetOutput() {
var reallocated bool
o.output, reallocated = o.accountingHelper.ResetMaybeReallocate(
o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
if reallocated {
o.outVecs.SetBatch(o.output)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ func (p *sortOp) Next() coldata.Batch {
p.state = sortDone
continue
}
p.output, _ = p.allocator.ResetMaybeReallocate(p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize)
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if toEmit > p.output.Capacity() {
toEmit = p.output.Capacity()
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func (t *topKSorter) emit() coldata.Batch {
// We're done.
return coldata.ZeroBatch
}
t.output, _ = t.allocator.ResetMaybeReallocate(t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize)
t.output, _ = t.allocator.ResetMaybeReallocate(
t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
if toEmit > t.output.Capacity() {
toEmit = t.output.Capacity()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (cf *cFetcher) resetBatch() {
}
cf.machine.batch, reallocated = cf.accountingHelper.ResetMaybeReallocate(
cf.table.typs, cf.machine.batch, minDesiredCapacity, cf.memoryLimit,
false, /* desiredCapacitySufficient */
)
if reallocated {
cf.machine.colvecs.SetBatch(cf.machine.batch)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,10 @@ func (i *Inbox) Next() coldata.Batch {
}
// We rely on the outboxes to produce reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(i.typs, i.scratch.b, batchLength, maxBatchMemSize)
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(
i.typs, i.scratch.b, batchLength, maxBatchMemSize,
true, /* desiredCapacitySufficient */
)
i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() {
if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil {
colexecerror.InternalError(err)
Expand Down
24 changes: 20 additions & 4 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,22 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
//
// The method will grow the allocated capacity of the batch exponentially
// (possibly incurring a reallocation), until the batch reaches
// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint.
// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint if
// desiredCapacitySufficient is false. When that parameter is true and the
// capacity of old batch is at least minDesiredCapacity, then the old batch is
// reused.
//
// NOTE: if the reallocation occurs, then the memory under the old batch is
// released, so it is expected that the caller will lose the references to the
// old batch.
// Note: the method assumes that minDesiredCapacity is at least 0 and will clamp
// minDesiredCapacity to be between 1 and coldata.BatchSize() inclusive.
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minDesiredCapacity int, maxBatchMemSize int64,
typs []*types.T,
oldBatch coldata.Batch,
minDesiredCapacity int,
maxBatchMemSize int64,
desiredCapacitySufficient bool,
) (newBatch coldata.Batch, reallocated bool) {
if minDesiredCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minDesiredCapacity %d", minDesiredCapacity))
Expand All @@ -179,6 +186,11 @@ func (a *Allocator) ResetMaybeReallocate(
} else {
// If old batch is already of the largest capacity, we will reuse it.
useOldBatch := oldBatch.Capacity() == coldata.BatchSize()
// If the old batch already satisfies the desired capacity which is
// sufficient, we will reuse it too.
if desiredCapacitySufficient && oldBatch.Capacity() >= minDesiredCapacity {
useOldBatch = true
}
// Avoid calculating the memory footprint if possible.
var oldBatchMemSize int64
if !useOldBatch {
Expand Down Expand Up @@ -568,10 +580,14 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
// Allocator.ResetMaybeReallocate (and thus has the same contract) with an
// additional logic for memory tracking purposes.
func (h *SetAccountingHelper) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
typs []*types.T,
oldBatch coldata.Batch,
minCapacity int,
maxBatchMemSize int64,
desiredCapacitySufficient bool,
) (newBatch coldata.Batch, reallocated bool) {
newBatch, reallocated = h.Allocator.ResetMaybeReallocate(
typs, oldBatch, minCapacity, maxBatchMemSize,
typs, oldBatch, minCapacity, maxBatchMemSize, desiredCapacitySufficient,
)
if reallocated && !h.allFixedLength {
// Allocator.ResetMaybeReallocate has released the precise memory
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/colmem/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func TestResetMaybeReallocate(t *testing.T) {
typs := []*types.T{types.Bytes}

// Allocate a new batch and modify it.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */)
b.SetSelection(true)
b.Selection()[0] = 1
b.ColVec(0).Bytes().Set(1, []byte("foo"))

oldBatch := b
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */)
// We should have used the same batch, and now it should be in a "reset"
// state.
require.Equal(t, oldBatch, b)
Expand All @@ -152,15 +152,21 @@ func TestResetMaybeReallocate(t *testing.T) {
// Allocate a new batch attempting to use the batch with too small of a
// capacity - new batch should **not** be allocated because the memory
// limit is already exceeded.
b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize, false /* desiredCapacitySufficient */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

oldBatch := b

// Reset the batch asking for the same small desired capacity when it is
// sufficient - the same batch should be returned.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity/2, smallMemSize, true /* desiredCapacitySufficient */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

// Reset the batch and confirm that a new batch is allocated because we
// have given larger memory limit.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, minDesiredCapacity, b.Capacity())

Expand All @@ -171,7 +177,7 @@ func TestResetMaybeReallocate(t *testing.T) {
// ResetMaybeReallocate truncates the capacity at
// coldata.BatchSize(), so we run this part of the test only when
// doubled capacity will not be truncated.
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize)
b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, 2*minDesiredCapacity, b.Capacity())
}
Expand Down Expand Up @@ -300,7 +306,7 @@ func TestSetAccountingHelper(t *testing.T) {
// new batch with larger capacity might be allocated.
maxBatchMemSize = largeMemSize
}
batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize)
batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize, false /* desiredCapacitySufficient */)

for rowIdx := 0; rowIdx < batch.Capacity(); rowIdx++ {
for vecIdx, typ := range typs {
Expand Down