Skip to content

Commit

Permalink
colmem: improve memory-limiting behavior of the accounting helpers
Browse files Browse the repository at this point in the history
This commit fixes an oversight in how we are allocating batches of the
"dynamic" capacity. We have two related ways for reallocating batches,
and both of them work by growing the capacity of the batch until the
memory limit is exceeded, and then the batch would be reused until the
end of the query execution. This is a reasonable heuristic under the
assumption that all tuples in the data stream are roughly equal in size,
but this might not be the case.

In particular, consider an example when 10k small rows of 1KiB are
followed by 10k large rows of 1MiB. According to our heuristic, we
happily grow the batch until 1024 in capacity, and then we do not shrink
the capacity of that batch, so once the large rows start appearing, we
put 1GiB worth of data into a single batch, significantly exceeding our
memory limit (usually 64MiB with the default `workmem` setting).

This commit introduces a new heuristic as follows:
- the first time a batch exceeds the memory limit, its capacity is memorized,
  and from now on that capacity will determine the upper bound on the
  capacities of the batches allocated through the helper;
- if at any point in time a batch exceeds the memory limit by at least a
  factor of two, then that batch is discarded, and the capacity will never
  exceed half of the capacity of the discarded batch;
- if the memory limit is not reached, then the behavior of the dynamic growth
  of the capacity provided by `Allocator.ResetMaybeReallocate` is still
  applicable (i.e. the capacities will grow exponentially until
  coldata.BatchSize()).

Note that this heuristic does not have an ability to grow the maximum
capacity once it's been set although it might make sense to do so (say,
if after shrinking the capacity, the next five times we see that the
batch is using less than half of the memory limit). This is an conscious
omission since I want this change to be backported, and never growing
seems like a safer choice. Thus, this improvement is left as a TODO.

Also, we still might create batches that are too large in memory
footprint in those places that don't use the SetAccountingHelper (e.g.
in the columnarizer) since we perform the memory limit check at the
batch granularity. However, this commit improves things there so that we
don't reuse that batch on the next iteration and will use half of the
capacity on the next iteration.

Release note (bug fix): CockroachDB now more precisely respects the
`distsql_workmem` setting which improves the stability of each node and
makes OOMs less likely.
  • Loading branch information
yuzefovich committed Aug 5, 2022
1 parent d94e279 commit 8abc478
Show file tree
Hide file tree
Showing 27 changed files with 543 additions and 187 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewCaseOp(
) colexecop.Operator {
// We internally use three selection vectors, scratch.order, origSel, and
// prevSel.
allocator.AdjustMemoryUsage(3 * colmem.SizeOfBatchSizeSelVector)
allocator.AdjustMemoryUsage(3 * colmem.SelVectorSize(coldata.BatchSize()))
return &caseOp{
allocator: allocator,
buffer: buffer.(*bufferOp),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,5 +840,5 @@ func (o *inputPartitioningOperator) close() {
// allocate a new windowed batch if necessary (which might be the case for
// the fallback strategy of the users of the hash-based partitioner).
o.windowedBatch = nil
o.allocator.ReleaseMemory(colmem.SizeOfBatchSizeSelVector)
o.allocator.ReleaseMemory(colmem.SelVectorSize(coldata.BatchSize()))
}
22 changes: 9 additions & 13 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewCrossJoiner(
rightTypes []*types.T,
diskAcc *mon.BoundAccount,
) colexecop.ClosableOperator {
return &crossJoiner{
c := &crossJoiner{
crossJoinerBase: newCrossJoinerBase(
unlimitedAllocator,
joinType,
Expand All @@ -51,21 +51,20 @@ func NewCrossJoiner(
fdSemaphore,
diskAcc,
),
joinHelper: newJoinHelper(left, right),
unlimitedAllocator: unlimitedAllocator,
outputTypes: joinType.MakeOutputTypes(leftTypes, rightTypes),
maxOutputBatchMemSize: memoryLimit,
joinHelper: newJoinHelper(left, right),
outputTypes: joinType.MakeOutputTypes(leftTypes, rightTypes),
}
c.helper.Init(unlimitedAllocator, memoryLimit)
return c
}

type crossJoiner struct {
*crossJoinerBase
*joinHelper

unlimitedAllocator *colmem.Allocator
rightInputConsumed bool
outputTypes []*types.T
maxOutputBatchMemSize int64
helper colmem.AccountingHelper
rightInputConsumed bool
outputTypes []*types.T
// isLeftAllNulls and isRightAllNulls indicate whether the output vectors
// corresponding to the left and right inputs, respectively, should consist
// only of NULL values. This is the case when we have right or left,
Expand Down Expand Up @@ -105,10 +104,7 @@ func (c *crossJoiner) Next() coldata.Batch {
c.done = true
return coldata.ZeroBatch
}
c.output, _ = c.unlimitedAllocator.ResetMaybeReallocate(
c.outputTypes, c.output, willEmit, c.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
c.output, _ = c.helper.ResetMaybeReallocate(c.outputTypes, c.output, willEmit)
if willEmit > c.output.Capacity() {
willEmit = c.output.Capacity()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func newMergeJoinBase(
},
diskAcc: diskAcc,
}
base.helper.Init(unlimitedAllocator, memoryLimit)
base.left.distincterInput = &colexecop.FeedOperator{}
base.left.distincter, base.left.distinctOutput = colexecbase.OrderedDistinctColsToOperators(
base.left.distincterInput, lEqCols, leftTypes, false, /* nullsAreDistinct */
Expand All @@ -549,6 +550,7 @@ type mergeJoinBase struct {
colexecop.CloserHelper

unlimitedAllocator *colmem.Allocator
helper colmem.AccountingHelper
memoryLimit int64
diskQueueCfg colcontainer.DiskQueueCfg
fdSemaphore semaphore.Semaphore
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,10 +1353,7 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
// */}}

func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch {
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(
o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit,
false, /* desiredCapacitySufficient */
)
o.output, _ = o.helper.ResetMaybeReallocate(o.outputTypes, o.output, 0 /* tuplesToBeSet */)
o.outputCapacity = o.output.Capacity()
o.bufferedGroup.helper.output = o.output
o.builderState.outCount = 0
Expand Down
41 changes: 28 additions & 13 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ type Columnarizer struct {
execinfra.ProcessorBaseNoHelper
colexecop.NonExplainable

mode columnarizerMode
mode columnarizerMode
// helper is used in the columnarizerBufferingMode mode.
helper colmem.AccountingHelper
// allocator is used directly only in the columnarizerStreamingMode mode.
allocator *colmem.Allocator
input execinfra.RowSource
da tree.DatumAlloc

buffered rowenc.EncDatumRows
batch coldata.Batch
maxBatchMemSize int64
accumulatedMeta []execinfrapb.ProducerMetadata
typs []*types.T

Expand Down Expand Up @@ -114,10 +116,12 @@ func newColumnarizer(
colexecerror.InternalError(errors.AssertionFailedf("unexpected columnarizerMode %d", mode))
}
c := &Columnarizer{
allocator: allocator,
input: input,
maxBatchMemSize: execinfra.GetWorkMemLimit(flowCtx),
mode: mode,
allocator: allocator,
input: input,
mode: mode,
}
if mode == columnarizerBufferingMode {
c.helper.Init(allocator, execinfra.GetWorkMemLimit(flowCtx))
}
c.ProcessorBaseNoHelper.Init(
nil, /* self */
Expand Down Expand Up @@ -200,9 +204,8 @@ func (c *Columnarizer) Next() coldata.Batch {
var reallocated bool
switch c.mode {
case columnarizerBufferingMode:
c.batch, reallocated = c.allocator.ResetMaybeReallocate(
c.typs, c.batch, 1, /* minDesiredCapacity */
c.maxBatchMemSize, false, /* desiredCapacitySufficient */
c.batch, reallocated = c.helper.ResetMaybeReallocate(
c.typs, c.batch, 0, /* tuplesToBeSet */
)
case columnarizerStreamingMode:
// Note that we're not using ResetMaybeReallocate because we will
Expand All @@ -218,10 +221,22 @@ func (c *Columnarizer) Next() coldata.Batch {
oldRows := c.buffered
newRows := make(rowenc.EncDatumRows, c.batch.Capacity())
copy(newRows, oldRows)
_ = newRows[len(oldRows)]
for i := len(oldRows); i < len(newRows); i++ {
//gcassert:bce
newRows[i] = make(rowenc.EncDatumRow, len(c.typs))
if len(oldRows) < len(newRows) {
_ = newRows[len(oldRows)]
for i := len(oldRows); i < len(newRows); i++ {
//gcassert:bce
newRows[i] = make(rowenc.EncDatumRow, len(c.typs))
}
} else if len(newRows) < len(oldRows) {
_ = oldRows[len(newRows)]
// Lose the reference to the old rows that aren't copied into the
// new slice - we need to do this since the capacity of the batch
// might have shrunk, and the rows at the end of the slice might
// never get overwritten.
for i := len(newRows); i < len(oldRows); i++ {
//gcassert:bce
oldRows[i] = nil
}
}
c.buffered = newRows
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/hash_aggregator.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/hash_aggregator_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch {
// len(op.buckets)-op.curOutputBucketIdx (the number of remaining
// output tuples) capacity.
op.output, _ = op.accountingHelper.ResetMaybeReallocate(
op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx, true, /* desiredCapacitySufficient */
op.outputTypes, op.output, len(op.buckets)-op.curOutputBucketIdx,
)
curOutputIdx := 0
for batchDone := false; op.curOutputBucketIdx < len(op.buckets) && !batchDone; {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
func (o *OrderedSynchronizer) resetOutput() {
var reallocated bool
o.output, reallocated = o.accountingHelper.ResetMaybeReallocate(
o.typs, o.output, 1 /* minDesiredCapacity */, false, /* desiredCapacitySufficient */
o.typs, o.output, 0, /* tuplesToBeSet */
)
if reallocated {
o.outVecs.SetBatch(o.output)
Expand Down
28 changes: 13 additions & 15 deletions pkg/sql/colexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ func newSorter(
for i, ord := range orderingCols[:len(orderingCols)-1] {
partitioners[i] = newPartitioner(inputTypes[ord.ColIdx], false /* nullsAreDistinct */)
}
return &sortOp{
allocator: allocator,
input: input,
inputTypes: inputTypes,
sorters: make([]colSorter, len(orderingCols)),
partitioners: partitioners,
orderingCols: orderingCols,
state: sortSpooling,
maxOutputBatchMemSize: maxOutputBatchMemSize,
s := &sortOp{
allocator: allocator,
input: input,
inputTypes: inputTypes,
sorters: make([]colSorter, len(orderingCols)),
partitioners: partitioners,
orderingCols: orderingCols,
state: sortSpooling,
}
s.helper.Init(allocator, maxOutputBatchMemSize)
return s
}

// spooler is a column vector operator that spools the data from its input.
Expand Down Expand Up @@ -180,6 +181,7 @@ type sortOp struct {
colexecop.InitHelper

allocator *colmem.Allocator
helper colmem.AccountingHelper
input spooler

// inputTypes contains the types of all of the columns from input.
Expand Down Expand Up @@ -222,8 +224,7 @@ type sortOp struct {
partitionsCol []bool
}

output coldata.Batch
maxOutputBatchMemSize int64
output coldata.Batch

exported int
}
Expand Down Expand Up @@ -287,10 +288,7 @@ func (p *sortOp) Next() coldata.Batch {
p.state = sortDone
continue
}
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize,
true, /* desiredCapacitySufficient */
)
p.output, _ = p.helper.ResetMaybeReallocate(p.inputTypes, p.output, toEmit)
if toEmit > p.output.Capacity() {
toEmit = p.output.Capacity()
}
Expand Down
Loading

0 comments on commit 8abc478

Please sign in to comment.