From aec12906ad7dd8f88bd609cb7a20fda2e0f97e17 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 1 Aug 2022 11:47:42 -0700 Subject: [PATCH] colmem: introduce a helper method when no memory limit should be applied This commit is a pure mechanical change. Release note: None --- pkg/sql/colexec/colexecjoin/hashjoiner.go | 7 ++----- pkg/sql/colexec/colexecutils/deselector.go | 8 ++----- .../colexec/colexecutils/spilling_queue.go | 21 +++++++------------ .../colexec/colexecwindow/buffered_window.go | 7 ++----- pkg/sql/colexec/ordered_aggregator.go | 17 +++++++-------- pkg/sql/colflow/colrpc/inbox.go | 7 ++----- pkg/sql/colmem/allocator.go | 13 ++++++++++++ 7 files changed, 35 insertions(+), 45 deletions(-) diff --git a/pkg/sql/colexec/colexecjoin/hashjoiner.go b/pkg/sql/colexec/colexecjoin/hashjoiner.go index 686eb3956399..2a8da3e5e5e2 100644 --- a/pkg/sql/colexec/colexecjoin/hashjoiner.go +++ b/pkg/sql/colexec/colexecjoin/hashjoiner.go @@ -12,7 +12,6 @@ package colexecjoin import ( "context" - "math" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -753,10 +752,8 @@ func (hj *hashJoiner) resetOutput(nResults int) { // 4. when the hashJoiner is used by the external hash joiner as the main // strategy, the hash-based partitioner is responsible for making sure that // partitions fit within memory limit. - const maxOutputBatchMemSize = math.MaxInt64 - hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate( - hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize, - true, /* desiredCapacitySufficient */ + hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocateNoMemLimit( + hj.outputTypes, hj.output, nResults, ) } diff --git a/pkg/sql/colexec/colexecutils/deselector.go b/pkg/sql/colexec/colexecutils/deselector.go index 1c8fdbd8a379..2d82c89458da 100644 --- a/pkg/sql/colexec/colexecutils/deselector.go +++ b/pkg/sql/colexec/colexecutils/deselector.go @@ -11,8 +11,6 @@ package colexecutils import ( - "math" - "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -59,10 +57,8 @@ func (p *deselectorOp) Next() coldata.Batch { // deselectorOp should *not* limit the capacities of the returned batches, // so we don't use a memory limit here. It is up to the wrapped operator to // limit the size of batches based on the memory footprint. - const maxBatchMemSize = math.MaxInt64 - p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate( - p.inputTypes, p.output, batch.Length(), maxBatchMemSize, - true, /* desiredCapacitySufficient */ + p.output, _ = p.unlimitedAllocator.ResetMaybeReallocateNoMemLimit( + p.inputTypes, p.output, batch.Length(), ) sel := batch.Selection() p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() { diff --git a/pkg/sql/colexec/colexecutils/spilling_queue.go b/pkg/sql/colexec/colexecutils/spilling_queue.go index 00175260bc01..78eea34d6d74 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue.go @@ -12,7 +12,6 @@ package colexecutils import ( "context" - "math" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" @@ -190,10 +189,8 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) { // // We want to fit all deselected tuples into a single batch, so we // don't enforce footprint based memory limit on a batch size. - const maxBatchMemSize = math.MaxInt64 - q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate( - q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize, - true, /* desiredCapacitySufficient */ + q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit( + q.typs, q.diskQueueDeselectionScratch, n, ) q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() { for i := range q.typs { @@ -285,18 +282,14 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) { } } + // No limit on the batch mem size here, however, we will be paying attention + // to the memory registered with the unlimited allocator, and we will stop + // adding tuples into this batch and spill when needed. // Note: we could have used NewMemBatchWithFixedCapacity here, but we choose // not to in order to indicate that the capacity of the new batches has // dynamic behavior. - newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocate( - q.typs, - nil, /* oldBatch */ - newBatchCapacity, - // No limit on the batch mem size here, however, we will be paying - // attention to the memory registered with the unlimited allocator, and - // we will stop adding tuples into this batch and spill when needed. - math.MaxInt64, /* maxBatchMemSize */ - true, /* desiredCapacitySufficient */ + newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit( + q.typs, nil /* oldBatch */, newBatchCapacity, ) q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() { for i := range q.typs { diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index 22dfc38c2f92..4e0e589e5aff 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -12,7 +12,6 @@ package colexecwindow import ( "context" - "math" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" @@ -248,10 +247,8 @@ func (b *bufferedWindowOp) Next() coldata.Batch { sel := batch.Selection() // We don't limit the batches based on the memory footprint because // we assume that the input is producing reasonably sized batches. - const maxBatchMemSize = math.MaxInt64 - b.currentBatch, _ = b.allocator.ResetMaybeReallocate( - b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize, - true, /* desiredCapacitySufficient */ + b.currentBatch, _ = b.allocator.ResetMaybeReallocateNoMemLimit( + b.outputTypes, b.currentBatch, batch.Length(), ) b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() { for colIdx, vec := range batch.ColVecs() { diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index 4042971fd85d..9e749357a1b9 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -12,7 +12,6 @@ package colexec import ( "context" - "math" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colconv" @@ -268,15 +267,15 @@ func (a *orderedAggregator) Next() coldata.Batch { case orderedAggregatorReallocating: // The ordered aggregator *cannot* limit the capacities of its // internal batches because it works under the assumption that any - // input batch can be handled in a single pass, so we don't use a - // memory limit here. It is up to the input to limit the size of - // batches based on the memory footprint. - const maxBatchMemSize = math.MaxInt64 + // input batch can be handled in a single pass, so we use + // ResetMaybeReallocateNoMemLimit. It is up to the input to limit + // the size of batches based on the memory footprint. + // // Twice the batchSize is allocated to avoid having to check for // overflow when outputting. newMinCapacity := 2 * a.lastReadBatch.Length() if newMinCapacity > coldata.BatchSize() { - // ResetMaybeReallocate truncates the capacity to + // ResetMaybeReallocateNoMemLimit truncates the capacity to // coldata.BatchSize(), but we actually want a batch with larger // capacity, so we choose to instantiate the batch with fixed // maximal capacity that can be needed by the aggregator. @@ -284,17 +283,15 @@ func (a *orderedAggregator) Next() coldata.Batch { newMinCapacity = 2 * coldata.BatchSize() a.scratch.Batch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, newMinCapacity) } else { - a.scratch.Batch, _ = a.allocator.ResetMaybeReallocate( + a.scratch.Batch, _ = a.allocator.ResetMaybeReallocateNoMemLimit( a.outputTypes, a.scratch.Batch, newMinCapacity, - maxBatchMemSize, true, /* desiredCapacitySufficient */ ) } // We will never copy more than coldata.BatchSize() into the // temporary buffer, so a half of the scratch's capacity will always // be sufficient. - a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocate( + a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocateNoMemLimit( a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2, - maxBatchMemSize, true, /* desiredCapacitySufficient */ ) for fnIdx, fn := range a.bucket.fns { fn.SetOutput(a.scratch.ColVec(fnIdx)) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index c63656059152..effa3d743664 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -13,7 +13,6 @@ package colrpc import ( "context" "io" - "math" "sync/atomic" "time" @@ -430,10 +429,8 @@ func (i *Inbox) Next() coldata.Batch { colexecerror.InternalError(err) } // We rely on the outboxes to produce reasonably sized batches. - const maxBatchMemSize = math.MaxInt64 - i.scratch.b, _ = i.allocator.ResetMaybeReallocate( - i.typs, i.scratch.b, batchLength, maxBatchMemSize, - true, /* desiredCapacitySufficient */ + i.scratch.b, _ = i.allocator.ResetMaybeReallocateNoMemLimit( + i.typs, i.scratch.b, batchLength, ) i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() { if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil { diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 67f12d633346..47738ef0c9c1 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -12,6 +12,7 @@ package colmem import ( "context" + "math" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/typeconv" @@ -223,6 +224,18 @@ func (a *Allocator) ResetMaybeReallocate( return newBatch, reallocated } +// ResetMaybeReallocateNoMemLimit is the same as ResetMaybeReallocate when +// MaxInt64 is used as the maxBatchMemSize argument and the desired capacity is +// sufficient. This should be used by the callers that know exactly the capacity +// they need and have no control over that number. It is guaranteed that the +// returned batch has the capacity of at least requiredCapacity (clamped to +// [1, coldata.BatchSize()] range). +func (a *Allocator) ResetMaybeReallocateNoMemLimit( + typs []*types.T, oldBatch coldata.Batch, requiredCapacity int, +) (newBatch coldata.Batch, reallocated bool) { + return a.ResetMaybeReallocate(typs, oldBatch, requiredCapacity, math.MaxInt64, true /* desiredCapacitySufficient */) +} + // NewMemColumn returns a new coldata.Vec of the desired capacity. // NOTE: consider whether you should be using MaybeAppendColumn, // NewMemBatchWith*, or ResetMaybeReallocate methods.