Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colmem: improve memory-limiting behavior of the accounting helpers #85440

Merged
merged 4 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewCaseOp(
) colexecop.Operator {
// We internally use three selection vectors, scratch.order, origSel, and
// prevSel.
allocator.AdjustMemoryUsage(3 * colmem.SizeOfBatchSizeSelVector)
allocator.AdjustMemoryUsage(3 * colmem.SelVectorSize(coldata.BatchSize()))
return &caseOp{
allocator: allocator,
buffer: buffer.(*bufferOp),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,5 +840,5 @@ func (o *inputPartitioningOperator) close() {
// allocate a new windowed batch if necessary (which might be the case for
// the fallback strategy of the users of the hash-based partitioner).
o.windowedBatch = nil
o.allocator.ReleaseMemory(colmem.SizeOfBatchSizeSelVector)
o.allocator.ReleaseMemory(colmem.SelVectorSize(coldata.BatchSize()))
}
22 changes: 9 additions & 13 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewCrossJoiner(
rightTypes []*types.T,
diskAcc *mon.BoundAccount,
) colexecop.ClosableOperator {
return &crossJoiner{
c := &crossJoiner{
crossJoinerBase: newCrossJoinerBase(
unlimitedAllocator,
joinType,
Expand All @@ -51,21 +51,20 @@ func NewCrossJoiner(
fdSemaphore,
diskAcc,
),
joinHelper: newJoinHelper(left, right),
unlimitedAllocator: unlimitedAllocator,
outputTypes: joinType.MakeOutputTypes(leftTypes, rightTypes),
maxOutputBatchMemSize: memoryLimit,
joinHelper: newJoinHelper(left, right),
outputTypes: joinType.MakeOutputTypes(leftTypes, rightTypes),
}
c.helper.Init(unlimitedAllocator, memoryLimit)
return c
}

type crossJoiner struct {
*crossJoinerBase
*joinHelper

unlimitedAllocator *colmem.Allocator
rightInputConsumed bool
outputTypes []*types.T
maxOutputBatchMemSize int64
helper colmem.AccountingHelper
rightInputConsumed bool
outputTypes []*types.T
// isLeftAllNulls and isRightAllNulls indicate whether the output vectors
// corresponding to the left and right inputs, respectively, should consist
// only of NULL values. This is the case when we have right or left,
Expand Down Expand Up @@ -105,10 +104,7 @@ func (c *crossJoiner) Next() coldata.Batch {
c.done = true
return coldata.ZeroBatch
}
c.output, _ = c.unlimitedAllocator.ResetMaybeReallocate(
c.outputTypes, c.output, willEmit, c.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
c.output, _ = c.helper.ResetMaybeReallocate(c.outputTypes, c.output, willEmit)
if willEmit > c.output.Capacity() {
willEmit = c.output.Capacity()
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecjoin

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -753,10 +752,8 @@ func (hj *hashJoiner) resetOutput(nResults int) {
// 4. when the hashJoiner is used by the external hash joiner as the main
// strategy, the hash-based partitioner is responsible for making sure that
// partitions fit within memory limit.
const maxOutputBatchMemSize = math.MaxInt64
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(
hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocateNoMemLimit(
hj.outputTypes, hj.output, nResults,
)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func newMergeJoinBase(
},
diskAcc: diskAcc,
}
base.helper.Init(unlimitedAllocator, memoryLimit)
base.left.distincterInput = &colexecop.FeedOperator{}
base.left.distincter, base.left.distinctOutput = colexecbase.OrderedDistinctColsToOperators(
base.left.distincterInput, lEqCols, leftTypes, false, /* nullsAreDistinct */
Expand All @@ -549,6 +550,7 @@ type mergeJoinBase struct {
colexecop.CloserHelper

unlimitedAllocator *colmem.Allocator
helper colmem.AccountingHelper
memoryLimit int64
diskQueueCfg colcontainer.DiskQueueCfg
fdSemaphore semaphore.Semaphore
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,10 +1353,7 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
// */}}

func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch {
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(
o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
o.output, _ = o.helper.ResetMaybeReallocate(o.outputTypes, o.output, 0 /* tuplesToBeSet */)
o.outputCapacity = o.output.Capacity()
o.bufferedGroup.helper.output = o.output
o.builderState.outCount = 0
Expand Down
8 changes: 2 additions & 6 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
package colexecutils

import (
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -59,10 +57,8 @@ func (p *deselectorOp) Next() coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
p.inputTypes, p.output, batch.Length(),
)
sel := batch.Selection()
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
Expand Down
21 changes: 7 additions & 14 deletions pkg/sql/colexec/colexecutils/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecutils

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
Expand Down Expand Up @@ -190,10 +189,8 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
//
// We want to fit all deselected tuples into a single batch, so we
// don't enforce footprint based memory limit on a batch size.
const maxBatchMemSize = math.MaxInt64
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate(
q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize,
true, /* desiredCapacitySufficient */
q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
q.typs, q.diskQueueDeselectionScratch, n,
)
q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() {
for i := range q.typs {
Expand Down Expand Up @@ -285,18 +282,14 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
}
}

// No limit on the batch mem size here, however, we will be paying attention
// to the memory registered with the unlimited allocator, and we will stop
// adding tuples into this batch and spill when needed.
// Note: we could have used NewMemBatchWithFixedCapacity here, but we choose
// not to in order to indicate that the capacity of the new batches has
// dynamic behavior.
newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocate(
q.typs,
nil, /* oldBatch */
newBatchCapacity,
// No limit on the batch mem size here, however, we will be paying
// attention to the memory registered with the unlimited allocator, and
// we will stop adding tuples into this batch and spill when needed.
math.MaxInt64, /* maxBatchMemSize */
true, /* desiredCapacitySufficient */
newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocateNoMemLimit(
q.typs, nil /* oldBatch */, newBatchCapacity,
)
q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() {
for i := range q.typs {
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecwindow

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
Expand Down Expand Up @@ -248,10 +247,8 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
sel := batch.Selection()
// We don't limit the batches based on the memory footprint because
// we assume that the input is producing reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
b.currentBatch, _ = b.allocator.ResetMaybeReallocate(
b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize,
true, /* desiredCapacitySufficient */
b.currentBatch, _ = b.allocator.ResetMaybeReallocateNoMemLimit(
b.outputTypes, b.currentBatch, batch.Length(),
)
b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() {
for colIdx, vec := range batch.ColVecs() {
Expand Down
41 changes: 28 additions & 13 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ type Columnarizer struct {
execinfra.ProcessorBaseNoHelper
colexecop.NonExplainable

mode columnarizerMode
mode columnarizerMode
// helper is used in the columnarizerBufferingMode mode.
helper colmem.AccountingHelper
// allocator is used directly only in the columnarizerStreamingMode mode.
allocator *colmem.Allocator
input execinfra.RowSource
da tree.DatumAlloc

buffered rowenc.EncDatumRows
batch coldata.Batch
maxBatchMemSize int64
accumulatedMeta []execinfrapb.ProducerMetadata
typs []*types.T

Expand Down Expand Up @@ -114,10 +116,12 @@ func newColumnarizer(
colexecerror.InternalError(errors.AssertionFailedf("unexpected columnarizerMode %d", mode))
}
c := &Columnarizer{
allocator: allocator,
input: input,
maxBatchMemSize: execinfra.GetWorkMemLimit(flowCtx),
mode: mode,
allocator: allocator,
input: input,
mode: mode,
}
if mode == columnarizerBufferingMode {
c.helper.Init(allocator, execinfra.GetWorkMemLimit(flowCtx))
}
c.ProcessorBaseNoHelper.Init(
nil, /* self */
Expand Down Expand Up @@ -200,9 +204,8 @@ func (c *Columnarizer) Next() coldata.Batch {
var reallocated bool
switch c.mode {
case columnarizerBufferingMode:
c.batch, reallocated = c.allocator.ResetMaybeReallocate(
c.typs, c.batch, 1, /* minDesiredCapacity */
c.maxBatchMemSize, false, /* desiredCapacitySufficient */
c.batch, reallocated = c.helper.ResetMaybeReallocate(
c.typs, c.batch, 0, /* tuplesToBeSet */
)
case columnarizerStreamingMode:
// Note that we're not using ResetMaybeReallocate because we will
Expand All @@ -218,10 +221,22 @@ func (c *Columnarizer) Next() coldata.Batch {
oldRows := c.buffered
newRows := make(rowenc.EncDatumRows, c.batch.Capacity())
copy(newRows, oldRows)
_ = newRows[len(oldRows)]
for i := len(oldRows); i < len(newRows); i++ {
//gcassert:bce
newRows[i] = make(rowenc.EncDatumRow, len(c.typs))
if len(oldRows) < len(newRows) {
_ = newRows[len(oldRows)]
for i := len(oldRows); i < len(newRows); i++ {
//gcassert:bce
newRows[i] = make(rowenc.EncDatumRow, len(c.typs))
}
} else if len(newRows) < len(oldRows) {
_ = oldRows[len(newRows)]
// Lose the reference to the old rows that aren't copied into the
// new slice - we need to do this since the capacity of the batch
// might have shrunk, and the rows at the end of the slice might
// never get overwritten.
for i := len(newRows); i < len(oldRows); i++ {
//gcassert:bce
oldRows[i] = nil
}
}
c.buffered = newRows
}
Expand Down
Loading