Skip to content

Commit

Permalink
colexec: fix external sorter memory accounting
Browse files Browse the repository at this point in the history
Previously, there was a problem with inputPartitioningOperator (which
divides up the input into "partitions" once the partition exceeds the
provided memory limit) because usage of Allocator.Clear is not
compatible with Allocator.PerformOperation. Now this problem is fixed by
explicitly tracking the memory used by the input batches with
RetainBatch against a "standalone" allocator. We're using standalone
budget for that to avoid double counting for the memory of the batch
(they are accounted already with unlimitedAllocator of the sorter).

Release note: None
  • Loading branch information
yuzefovich committed Feb 20, 2020
1 parent 3429c96 commit b344b39
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 29 deletions.
19 changes: 4 additions & 15 deletions pkg/sql/colexec/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,11 @@ func (a *Allocator) Used() int64 {
}

// Clear clears up the memory account of the allocator.
// WARNING: usage of this method is *not* compatible with using
// PerformOperation. Use this only in combination with RetainBatch /
// ReleaseBatch.
func (a *Allocator) Clear() {
// TODO(yuzefovich): usage of a.acc.Clear method is not compatible with our
// PerformOperation. Consider the following scenario:
// 1. we allocate []int64 Vec with 1024 capacity, this memory is registered
// with the allocator (8192 bytes).
// 2. then we Clear() the allocator, now the memory account says that we use
// 0 bytes, but the actual memory is not released.
// 3. we reuse the Vec and append into it 1024 int64 values. Because there
// was enough capacity to accommodate it, no new memory is allocated.
// 4. when we called PerformOperation to account for that memory, we computed
// "before" and "after" memory estimates, and in both cases they equal 8192
// bytes, so "delta" is 0, and the allocator continues to think that 0 bytes
// have been registered with it, but it is not the case.
// I think "adding disk queue to hash router" PR has more fine-grained
// methods for working with memory. Consider reusing them here.
a.acc.Clear(a.ctx)
a.acc.Shrink(a.ctx, a.acc.Used())
}

const (
Expand Down
30 changes: 30 additions & 0 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,12 +767,17 @@ func NewColOperator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix,
))
standaloneAllocator := NewAllocator(
ctx, result.createStandaloneMemAccount(
ctx, flowCtx, monitorNamePrefix,
))
diskQueuesUnlimitedAllocator := NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix+"-disk-queues",
))
return newExternalSorter(
unlimitedAllocator,
standaloneAllocator,
input, inputTypes, core.Sorter.OutputOrdering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
args.TestingKnobs.MaxNumberPartitions,
Expand Down Expand Up @@ -1003,6 +1008,31 @@ func (r *NewColOperatorResult) createBufferingUnlimitedMemAccount(
return &bufferingMemAccount
}

// createStandaloneMemAccount instantiates an unlimited memory monitor and a
// memory account that have a standalone budget. This means that the memory
// registered with these objects is *not* reported to the root monitor (i.e.
// it will not count towards max-sql-memory). Use it only when the memory in
// use is accounted for with a different memory monitor. The receiver is
// updated to have references to both objects.
func (r *NewColOperatorResult) createStandaloneMemAccount(
ctx context.Context, flowCtx *execinfra.FlowCtx, name string,
) *mon.BoundAccount {
standaloneMemMonitor := mon.MakeMonitor(
name+"-standalone",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment: use default increment */
math.MaxInt64, /* noteworthy */
flowCtx.Cfg.Settings,
)
r.BufferingOpMemMonitors = append(r.BufferingOpMemMonitors, &standaloneMemMonitor)
standaloneMemMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
standaloneMemAccount := standaloneMemMonitor.MakeBoundAccount()
r.BufferingOpMemAccounts = append(r.BufferingOpMemAccounts, &standaloneMemAccount)
return &standaloneMemAccount
}

func (r *NewColOperatorResult) planFilterExpr(
ctx context.Context,
evalCtx *tree.EvalContext,
Expand Down
35 changes: 21 additions & 14 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ var _ Operator = &externalSorter{}
// from an unlimited memory monitor. It will be used by several internal
// components of the external sort which is responsible for making sure that
// the components stay within the memory limit.
// - standaloneAllocator must have been created with a memory account derived
// from an unlimited memory monitor with a standalone budget. It will be used
// by inputPartitioningOperator to "partition" the input according to memory
// limit. The budget *must* be standalone because we don't want to double
// count the memory (the memory under the batches will be accounted for with
// the unlimitedAllocator).
// - diskQueuesUnlimitedAllocator is a (temporary) unlimited allocator that
// will be used to create dummy queues and will be removed once we have actual
// disk-backed queues.
Expand All @@ -149,6 +155,7 @@ var _ Operator = &externalSorter{}
// non-zero only in tests.
func newExternalSorter(
unlimitedAllocator *Allocator,
standaloneAllocator *Allocator,
input Operator,
inputTypes []coltypes.T,
ordering execinfrapb.Ordering,
Expand All @@ -158,7 +165,7 @@ func newExternalSorter(
diskQueuesUnlimitedAllocator *Allocator,
cfg colcontainer.DiskQueueCfg,
) Operator {
inputPartitioner := newInputPartitioningOperator(unlimitedAllocator, input, memoryLimit)
inputPartitioner := newInputPartitioningOperator(standaloneAllocator, input, memoryLimit)
inMemSorter, err := newSorter(
unlimitedAllocator, newAllSpooler(unlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
Expand Down Expand Up @@ -273,7 +280,6 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
}
s.firstPartitionIdx += numPartitionsToMerge
s.numPartitions -= numPartitionsToMerge - 1
s.unlimitedAllocator.Clear()
continue
case externalSorterFinalMerging:
if s.numPartitions == 0 {
Expand Down Expand Up @@ -325,25 +331,24 @@ func (s *externalSorter) createMergerForPartitions(firstIdx, numPartitions int)
}

func newInputPartitioningOperator(
unlimitedAllocator *Allocator, input Operator, memoryLimit int64,
standaloneAllocator *Allocator, input Operator, memoryLimit int64,
) resettableOperator {
return &inputPartitioningOperator{
OneInputNode: NewOneInputNode(input),
unlimitedAllocator: unlimitedAllocator,
memoryLimit: memoryLimit,
OneInputNode: NewOneInputNode(input),
standaloneAllocator: standaloneAllocator,
memoryLimit: memoryLimit,
}
}

// inputPartitioningOperator is an operator that returns the batches from its
// input until the unlimited allocator (which the output operator must be
// using) reaches the memory limit. From that point, the operator returns a
// zero-length batch (until it is reset).
// input until the standalone allocator reaches the memory limit. From that
// point, the operator returns a zero-length batch (until it is reset).
type inputPartitioningOperator struct {
OneInputNode
NonExplainable

unlimitedAllocator *Allocator
memoryLimit int64
standaloneAllocator *Allocator
memoryLimit int64
}

var _ resettableOperator = &inputPartitioningOperator{}
Expand All @@ -353,14 +358,16 @@ func (o *inputPartitioningOperator) Init() {
}

func (o *inputPartitioningOperator) Next(ctx context.Context) coldata.Batch {
if o.unlimitedAllocator.Used() >= o.memoryLimit {
if o.standaloneAllocator.Used() >= o.memoryLimit {
return coldata.ZeroBatch
}
return o.input.Next(ctx)
b := o.input.Next(ctx)
o.standaloneAllocator.RetainBatch(b)
return b
}

func (o *inputPartitioningOperator) reset() {
o.unlimitedAllocator.Clear()
o.standaloneAllocator.Clear()
}

func newPartitionerToOperator(
Expand Down

0 comments on commit b344b39

Please sign in to comment.