Skip to content

Commit

Permalink
colmem: introduce a helper method when no memory limit should be applied
Browse files Browse the repository at this point in the history
This commit is a pure mechanical change.

Release note: None
  • Loading branch information
yuzefovich committed Aug 10, 2022
1 parent 0ad97e4 commit aec1290
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 45 deletions.
7 changes: 2 additions & 5 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecjoin

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -753,10 +752,8 @@ func (hj *hashJoiner) resetOutput(nResults int) {
// 4. when the hashJoiner is used by the external hash joiner as the main
// strategy, the hash-based partitioner is responsible for making sure that
// partitions fit within memory limit.
const maxOutputBatchMemSize = math.MaxInt64
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(
hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocateNoMemLimit(
hj.outputTypes, hj.output, nResults,
)
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
package colexecutils

import (
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -59,10 +57,8 @@ func (p *deselectorOp) Next() coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
p.inputTypes, p.output, batch.Length(),
)
sel := batch.Selection()
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
Expand Down
21 changes: 7 additions & 14 deletions pkg/sql/colexec/colexecutils/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecutils

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
Expand Down Expand Up @@ -190,10 +189,8 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
//
// We want to fit all deselected tuples into a single batch, so we
// don't enforce footprint based memory limit on a batch size.
const maxBatchMemSize = math.MaxInt64
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate(
q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize,
true, /* desiredCapacitySufficient */
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
q.typs, q.diskQueueDeselectionScratch, n,
)
q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() {
for i := range q.typs {
Expand Down Expand Up @@ -285,18 +282,14 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
}
}

// No limit on the batch mem size here, however, we will be paying attention
// to the memory registered with the unlimited allocator, and we will stop
// adding tuples into this batch and spill when needed.
// Note: we could have used NewMemBatchWithFixedCapacity here, but we choose
// not to in order to indicate that the capacity of the new batches has
// dynamic behavior.
newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocate(
q.typs,
nil, /* oldBatch */
newBatchCapacity,
// No limit on the batch mem size here, however, we will be paying
// attention to the memory registered with the unlimited allocator, and
// we will stop adding tuples into this batch and spill when needed.
math.MaxInt64, /* maxBatchMemSize */
true, /* desiredCapacitySufficient */
newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
q.typs, nil /* oldBatch */, newBatchCapacity,
)
q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() {
for i := range q.typs {
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecwindow

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
Expand Down Expand Up @@ -248,10 +247,8 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
sel := batch.Selection()
// We don't limit the batches based on the memory footprint because
// we assume that the input is producing reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
b.currentBatch, _ = b.allocator.ResetMaybeReallocate(
b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
b.currentBatch, _ = b.allocator.ResetMaybeReallocateNoMemLimit(
b.outputTypes, b.currentBatch, batch.Length(),
)
b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() {
for colIdx, vec := range batch.ColVecs() {
Expand Down
17 changes: 7 additions & 10 deletions pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexec

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colconv"
Expand Down Expand Up @@ -268,33 +267,31 @@ func (a *orderedAggregator) Next() coldata.Batch {
case orderedAggregatorReallocating:
// The ordered aggregator *cannot* limit the capacities of its
// internal batches because it works under the assumption that any
// input batch can be handled in a single pass, so we don't use a
// memory limit here. It is up to the input to limit the size of
// batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
// input batch can be handled in a single pass, so we use
// ResetMaybeReallocateNoMemLimit. It is up to the input to limit
// the size of batches based on the memory footprint.
//
// Twice the batchSize is allocated to avoid having to check for
// overflow when outputting.
newMinCapacity := 2 * a.lastReadBatch.Length()
if newMinCapacity > coldata.BatchSize() {
// ResetMaybeReallocate truncates the capacity to
// ResetMaybeReallocateNoMemLimit truncates the capacity to
// coldata.BatchSize(), but we actually want a batch with larger
// capacity, so we choose to instantiate the batch with fixed
// maximal capacity that can be needed by the aggregator.
a.allocator.ReleaseMemory(colmem.GetBatchMemSize(a.scratch.Batch))
newMinCapacity = 2 * coldata.BatchSize()
a.scratch.Batch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, newMinCapacity)
} else {
a.scratch.Batch, _ = a.allocator.ResetMaybeReallocate(
a.scratch.Batch, _ = a.allocator.ResetMaybeReallocateNoMemLimit(
a.outputTypes, a.scratch.Batch, newMinCapacity,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
}
// We will never copy more than coldata.BatchSize() into the
// temporary buffer, so a half of the scratch's capacity will always
// be sufficient.
a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocate(
a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocateNoMemLimit(
a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2,
maxBatchMemSize, true, /* desiredCapacitySufficient */
)
for fnIdx, fn := range a.bucket.fns {
fn.SetOutput(a.scratch.ColVec(fnIdx))
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package colrpc
import (
"context"
"io"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -430,10 +429,8 @@ func (i *Inbox) Next() coldata.Batch {
colexecerror.InternalError(err)
}
// We rely on the outboxes to produce reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(
i.typs, i.scratch.b, batchLength, maxBatchMemSize,
true, /* desiredCapacitySufficient */
i.scratch.b, _ = i.allocator.ResetMaybeReallocateNoMemLimit(
i.typs, i.scratch.b, batchLength,
)
i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() {
if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colmem

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/typeconv"
Expand Down Expand Up @@ -223,6 +224,18 @@ func (a *Allocator) ResetMaybeReallocate(
return newBatch, reallocated
}

// ResetMaybeReallocateNoMemLimit is the same as ResetMaybeReallocate when
// MaxInt64 is used as the maxBatchMemSize argument and the desired capacity is
// sufficient. This should be used by the callers that know exactly the capacity
// they need and have no control over that number. It is guaranteed that the
// returned batch has the capacity of at least requiredCapacity (clamped to
// [1, coldata.BatchSize()] range).
func (a *Allocator) ResetMaybeReallocateNoMemLimit(
typs []*types.T, oldBatch coldata.Batch, requiredCapacity int,
) (newBatch coldata.Batch, reallocated bool) {
return a.ResetMaybeReallocate(typs, oldBatch, requiredCapacity, math.MaxInt64, true /* desiredCapacitySufficient */)
}

// NewMemColumn returns a new coldata.Vec of the desired capacity.
// NOTE: consider whether you should be using MaybeAppendColumn,
// NewMemBatchWith*, or ResetMaybeReallocate methods.
Expand Down

0 comments on commit aec1290

Please sign in to comment.